mito2/
region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region.
16
17pub mod opener;
18pub mod options;
19pub(crate) mod version;
20
21use std::collections::hash_map::Entry;
22use std::collections::HashMap;
23use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
24use std::sync::{Arc, RwLock};
25
26use common_telemetry::{error, info, warn};
27use crossbeam_utils::atomic::AtomicCell;
28use snafu::{ensure, OptionExt};
29use store_api::codec::PrimaryKeyEncoding;
30use store_api::logstore::provider::Provider;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
34};
35use store_api::storage::{RegionId, SequenceNumber};
36use store_api::ManifestVersion;
37
38use crate::access_layer::AccessLayerRef;
39use crate::error::{
40    FlushableRegionStateSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
41    UpdateManifestSnafu,
42};
43use crate::manifest::action::{RegionManifest, RegionMetaAction, RegionMetaActionList};
44use crate::manifest::manager::RegionManifestManager;
45use crate::memtable::MemtableBuilderRef;
46use crate::region::version::{VersionControlRef, VersionRef};
47use crate::request::{OnFailure, OptionOutputTx};
48use crate::sst::file_purger::FilePurgerRef;
49use crate::time_provider::TimeProviderRef;
50
51/// This is the approximate factor to estimate the size of wal.
52const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
53
54/// Region status include region id, memtable usage, sst usage, wal usage and manifest usage.
55#[derive(Debug)]
56pub struct RegionUsage {
57    pub region_id: RegionId,
58    pub wal_usage: u64,
59    pub sst_usage: u64,
60    pub manifest_usage: u64,
61}
62
63impl RegionUsage {
64    pub fn disk_usage(&self) -> u64 {
65        self.wal_usage + self.sst_usage + self.manifest_usage
66    }
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum RegionLeaderState {
71    /// The region is opened and is writable.
72    Writable,
73    /// The region is altering.
74    Altering,
75    /// The region is dropping.
76    Dropping,
77    /// The region is truncating.
78    Truncating,
79    /// The region is handling a region edit.
80    Editing,
81    /// The region is stepping down.
82    Downgrading,
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum RegionRoleState {
87    Leader(RegionLeaderState),
88    Follower,
89}
90
91/// Metadata and runtime status of a region.
92///
93/// Writing and reading a region follow a single-writer-multi-reader rule:
94/// - Only the region worker thread this region belongs to can modify the metadata.
95/// - Multiple reader threads are allowed to read a specific `version` of a region.
96#[derive(Debug)]
97pub struct MitoRegion {
98    /// Id of this region.
99    ///
100    /// Accessing region id from the version control is inconvenient so
101    /// we also store it here.
102    pub(crate) region_id: RegionId,
103
104    /// Version controller for this region.
105    ///
106    /// We MUST update the version control inside the write lock of the region manifest manager.
107    pub(crate) version_control: VersionControlRef,
108    /// SSTs accessor for this region.
109    pub(crate) access_layer: AccessLayerRef,
110    /// Context to maintain manifest for this region.
111    pub(crate) manifest_ctx: ManifestContextRef,
112    /// SST file purger.
113    pub(crate) file_purger: FilePurgerRef,
114    /// The provider of log store.
115    pub(crate) provider: Provider,
116    /// Last flush time in millis.
117    last_flush_millis: AtomicI64,
118    /// Last compaction time in millis.
119    last_compaction_millis: AtomicI64,
120    /// Provider to get current time.
121    time_provider: TimeProviderRef,
122    /// The topic's latest entry id since the region's last flushing.
123    /// **Only used for remote WAL pruning.**
124    ///
125    /// The value will be updated to the high watermark of the topic
126    /// if region receives a flush request or schedules a periodic flush task
127    /// and the region's memtable is empty.    
128    ///
129    /// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region,
130    /// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`.
131    pub(crate) topic_latest_entry_id: AtomicU64,
132    /// Memtable builder for the region.
133    pub(crate) memtable_builder: MemtableBuilderRef,
134    /// manifest stats
135    stats: ManifestStats,
136}
137
138pub type MitoRegionRef = Arc<MitoRegion>;
139
140impl MitoRegion {
141    /// Stop background managers for this region.
142    pub(crate) async fn stop(&self) {
143        self.manifest_ctx
144            .manifest_manager
145            .write()
146            .await
147            .stop()
148            .await;
149
150        info!(
151            "Stopped region manifest manager, region_id: {}",
152            self.region_id
153        );
154    }
155
156    /// Returns current metadata of the region.
157    pub(crate) fn metadata(&self) -> RegionMetadataRef {
158        let version_data = self.version_control.current();
159        version_data.version.metadata.clone()
160    }
161
162    /// Returns primary key encoding of the region.
163    pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
164        let version_data = self.version_control.current();
165        version_data.version.metadata.primary_key_encoding
166    }
167
168    /// Returns current version of the region.
169    pub(crate) fn version(&self) -> VersionRef {
170        let version_data = self.version_control.current();
171        version_data.version
172    }
173
174    /// Returns last flush timestamp in millis.
175    pub(crate) fn last_flush_millis(&self) -> i64 {
176        self.last_flush_millis.load(Ordering::Relaxed)
177    }
178
179    /// Update flush time to current time.
180    pub(crate) fn update_flush_millis(&self) {
181        let now = self.time_provider.current_time_millis();
182        self.last_flush_millis.store(now, Ordering::Relaxed);
183    }
184
185    /// Return last compaction time in millis.
186    pub(crate) fn last_compaction_millis(&self) -> i64 {
187        self.last_compaction_millis.load(Ordering::Relaxed)
188    }
189
190    /// Update compaction time to now millis.
191    pub(crate) fn update_compaction_millis(&self) {
192        let now = self.time_provider.current_time_millis();
193        self.last_compaction_millis.store(now, Ordering::Relaxed);
194    }
195
196    /// Returns the table dir.
197    pub(crate) fn table_dir(&self) -> &str {
198        self.access_layer.table_dir()
199    }
200
201    /// Returns whether the region is writable.
202    pub(crate) fn is_writable(&self) -> bool {
203        self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Writable)
204    }
205
206    /// Returns whether the region is flushable.
207    pub(crate) fn is_flushable(&self) -> bool {
208        matches!(
209            self.manifest_ctx.state.load(),
210            RegionRoleState::Leader(RegionLeaderState::Writable)
211                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
212        )
213    }
214
215    /// Returns whether the region is downgrading.
216    pub(crate) fn is_downgrading(&self) -> bool {
217        matches!(
218            self.manifest_ctx.state.load(),
219            RegionRoleState::Leader(RegionLeaderState::Downgrading)
220        )
221    }
222
223    pub fn region_id(&self) -> RegionId {
224        self.region_id
225    }
226
227    pub fn find_committed_sequence(&self) -> SequenceNumber {
228        self.version_control.committed_sequence()
229    }
230
231    /// Returns whether the region is readonly.
232    pub fn is_follower(&self) -> bool {
233        self.manifest_ctx.state.load() == RegionRoleState::Follower
234    }
235
236    /// Returns the state of the region.
237    pub(crate) fn state(&self) -> RegionRoleState {
238        self.manifest_ctx.state.load()
239    }
240
241    /// Sets the region role state.
242    pub(crate) fn set_role(&self, next_role: RegionRole) {
243        self.manifest_ctx.set_role(next_role, self.region_id);
244    }
245
246    /// Sets the altering state.
247    /// You should call this method in the worker loop.
248    pub(crate) fn set_altering(&self) -> Result<()> {
249        self.compare_exchange_state(
250            RegionLeaderState::Writable,
251            RegionRoleState::Leader(RegionLeaderState::Altering),
252        )
253    }
254
255    /// Sets the dropping state.
256    /// You should call this method in the worker loop.
257    pub(crate) fn set_dropping(&self) -> Result<()> {
258        self.compare_exchange_state(
259            RegionLeaderState::Writable,
260            RegionRoleState::Leader(RegionLeaderState::Dropping),
261        )
262    }
263
264    /// Sets the truncating state.
265    /// You should call this method in the worker loop.
266    pub(crate) fn set_truncating(&self) -> Result<()> {
267        self.compare_exchange_state(
268            RegionLeaderState::Writable,
269            RegionRoleState::Leader(RegionLeaderState::Truncating),
270        )
271    }
272
273    /// Sets the editing state.
274    /// You should call this method in the worker loop.
275    pub(crate) fn set_editing(&self) -> Result<()> {
276        self.compare_exchange_state(
277            RegionLeaderState::Writable,
278            RegionRoleState::Leader(RegionLeaderState::Editing),
279        )
280    }
281
282    /// Sets the region to readonly gracefully. This acquires the manifest write lock.
283    pub(crate) async fn set_role_state_gracefully(&self, state: SettableRegionRoleState) {
284        let _manager = self.manifest_ctx.manifest_manager.write().await;
285        // We acquires the write lock of the manifest manager to ensure that no one is updating the manifest.
286        // Then we change the state.
287        self.set_role(state.into());
288    }
289
290    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Writable)` if the current state is `expect`.
291    /// Otherwise, logs an error.
292    pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
293        if let Err(e) = self
294            .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
295        {
296            error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
297        }
298    }
299
300    /// Returns the region statistic.
301    pub(crate) fn region_statistic(&self) -> RegionStatistic {
302        let version = self.version();
303        let memtables = &version.memtables;
304        let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
305
306        let sst_usage = version.ssts.sst_usage();
307        let index_usage = version.ssts.index_usage();
308        let flushed_entry_id = version.flushed_entry_id;
309
310        let wal_usage = self.estimated_wal_usage(memtable_usage);
311        let manifest_usage = self.stats.total_manifest_size();
312        let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
313        let num_files = version.ssts.num_files();
314        let manifest_version = self.stats.manifest_version();
315
316        let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
317
318        RegionStatistic {
319            num_rows,
320            memtable_size: memtable_usage,
321            wal_size: wal_usage,
322            manifest_size: manifest_usage,
323            sst_size: sst_usage,
324            sst_num: num_files,
325            index_size: index_usage,
326            manifest: RegionManifestInfo::Mito {
327                manifest_version,
328                flushed_entry_id,
329            },
330            data_topic_latest_entry_id: topic_latest_entry_id,
331            metadata_topic_latest_entry_id: topic_latest_entry_id,
332        }
333    }
334
335    /// Estimated WAL size in bytes.
336    /// Use the memtables size to estimate the size of wal.
337    fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
338        ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
339    }
340
341    /// Sets the state of the region to given state if the current state equals to
342    /// the expected.
343    fn compare_exchange_state(
344        &self,
345        expect: RegionLeaderState,
346        state: RegionRoleState,
347    ) -> Result<()> {
348        self.manifest_ctx
349            .state
350            .compare_exchange(RegionRoleState::Leader(expect), state)
351            .map_err(|actual| {
352                RegionStateSnafu {
353                    region_id: self.region_id,
354                    state: actual,
355                    expect: RegionRoleState::Leader(expect),
356                }
357                .build()
358            })?;
359        Ok(())
360    }
361}
362
363/// Context to update the region manifest.
364#[derive(Debug)]
365pub(crate) struct ManifestContext {
366    /// Manager to maintain manifest for this region.
367    manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
368    /// The state of the region. The region checks the state before updating
369    /// manifest.
370    state: AtomicCell<RegionRoleState>,
371}
372
373impl ManifestContext {
374    pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
375        ManifestContext {
376            manifest_manager: tokio::sync::RwLock::new(manager),
377            state: AtomicCell::new(state),
378        }
379    }
380
381    pub(crate) async fn manifest_version(&self) -> ManifestVersion {
382        self.manifest_manager
383            .read()
384            .await
385            .manifest()
386            .manifest_version
387    }
388
389    pub(crate) async fn has_update(&self) -> Result<bool> {
390        self.manifest_manager.read().await.has_update().await
391    }
392
393    /// Installs the manifest changes from the current version to the target version (inclusive).
394    ///
395    /// Returns installed [RegionManifest].
396    /// **Note**: This function is not guaranteed to install the target version strictly.
397    /// The installed version may be greater than the target version.
398    pub(crate) async fn install_manifest_to(
399        &self,
400        version: ManifestVersion,
401    ) -> Result<Arc<RegionManifest>> {
402        let mut manager = self.manifest_manager.write().await;
403        manager.install_manifest_to(version).await?;
404
405        Ok(manager.manifest())
406    }
407
408    /// Updates the manifest if current state is `expect_state`.
409    pub(crate) async fn update_manifest(
410        &self,
411        expect_state: RegionLeaderState,
412        action_list: RegionMetaActionList,
413    ) -> Result<ManifestVersion> {
414        // Acquires the write lock of the manifest manager.
415        let mut manager = self.manifest_manager.write().await;
416        // Gets current manifest.
417        let manifest = manager.manifest();
418        // Checks state inside the lock. This is to ensure that we won't update the manifest
419        // after `set_readonly_gracefully()` is called.
420        let current_state = self.state.load();
421
422        // If expect_state is not downgrading, the current state must be either `expect_state` or downgrading.
423        //
424        // A downgrading leader rejects user writes but still allows
425        // flushing the memtable and updating the manifest.
426        if expect_state != RegionLeaderState::Downgrading {
427            if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
428                info!(
429                    "Region {} is in downgrading leader state, updating manifest. state is {:?}",
430                    manifest.metadata.region_id, expect_state
431                );
432            }
433            ensure!(
434                current_state == RegionRoleState::Leader(expect_state)
435                    || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
436                UpdateManifestSnafu {
437                    region_id: manifest.metadata.region_id,
438                    state: current_state,
439                }
440            );
441        } else {
442            ensure!(
443                current_state == RegionRoleState::Leader(expect_state),
444                RegionStateSnafu {
445                    region_id: manifest.metadata.region_id,
446                    state: current_state,
447                    expect: RegionRoleState::Leader(expect_state),
448                }
449            );
450        }
451
452        for action in &action_list.actions {
453            // Checks whether the edit is still applicable.
454            let RegionMetaAction::Edit(edit) = &action else {
455                continue;
456            };
457
458            // Checks whether the region is truncated.
459            let Some(truncated_entry_id) = manifest.truncated_entry_id else {
460                continue;
461            };
462
463            // This is an edit from flush.
464            if let Some(flushed_entry_id) = edit.flushed_entry_id {
465                ensure!(
466                    truncated_entry_id < flushed_entry_id,
467                    RegionTruncatedSnafu {
468                        region_id: manifest.metadata.region_id,
469                    }
470                );
471            }
472
473            // This is an edit from compaction.
474            if !edit.files_to_remove.is_empty() {
475                // Input files of the compaction task has been truncated.
476                for file in &edit.files_to_remove {
477                    ensure!(
478                        manifest.files.contains_key(&file.file_id),
479                        RegionTruncatedSnafu {
480                            region_id: manifest.metadata.region_id,
481                        }
482                    );
483                }
484            }
485        }
486
487        // Now we can update the manifest.
488        let version = manager.update(action_list).await.inspect_err(
489            |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
490        )?;
491
492        if self.state.load() == RegionRoleState::Follower {
493            warn!(
494                "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
495                manifest.metadata.region_id
496            );
497        }
498
499        Ok(version)
500    }
501
502    /// Sets the [`RegionRole`].
503    ///
504    /// ```
505    ///     +------------------------------------------+
506    ///     |                      +-----------------+ |
507    ///     |                      |                 | |
508    /// +---+------+       +-------+-----+        +--v-v---+
509    /// | Follower |       | Downgrading |        | Leader |
510    /// +---^-^----+       +-----+-^-----+        +--+-+---+
511    ///     | |                  | |                 | |
512    ///     | +------------------+ +-----------------+ |
513    ///     +------------------------------------------+
514    ///
515    /// Transition:
516    /// - Follower -> Leader
517    /// - Downgrading Leader -> Leader
518    /// - Leader -> Follower
519    /// - Downgrading Leader -> Follower
520    /// - Leader -> Downgrading Leader
521    ///
522    /// ```
523    pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
524        match next_role {
525            RegionRole::Follower => {
526                match self.state.fetch_update(|state| {
527                    if !matches!(state, RegionRoleState::Follower) {
528                        Some(RegionRoleState::Follower)
529                    } else {
530                        None
531                    }
532                }) {
533                    Ok(state) => info!(
534                        "Convert region {} to follower, previous role state: {:?}",
535                        region_id, state
536                    ),
537                    Err(state) => {
538                        if state != RegionRoleState::Follower {
539                            warn!(
540                                "Failed to convert region {} to follower, current role state: {:?}",
541                                region_id, state
542                            )
543                        }
544                    }
545                }
546            }
547            RegionRole::Leader => {
548                match self.state.fetch_update(|state| {
549                    if matches!(
550                        state,
551                        RegionRoleState::Follower
552                            | RegionRoleState::Leader(RegionLeaderState::Downgrading)
553                    ) {
554                        Some(RegionRoleState::Leader(RegionLeaderState::Writable))
555                    } else {
556                        None
557                    }
558                }) {
559                    Ok(state) => info!(
560                        "Convert region {} to leader, previous role state: {:?}",
561                        region_id, state
562                    ),
563                    Err(state) => {
564                        if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
565                            warn!(
566                                "Failed to convert region {} to leader, current role state: {:?}",
567                                region_id, state
568                            )
569                        }
570                    }
571                }
572            }
573            RegionRole::DowngradingLeader => {
574                match self.state.compare_exchange(
575                    RegionRoleState::Leader(RegionLeaderState::Writable),
576                    RegionRoleState::Leader(RegionLeaderState::Downgrading),
577                ) {
578                    Ok(state) => info!(
579                        "Convert region {} to downgrading region, previous role state: {:?}",
580                        region_id, state
581                    ),
582                    Err(state) => {
583                        if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
584                            warn!(
585                                "Failed to convert region {} to downgrading leader, current role state: {:?}",
586                                region_id, state
587                            )
588                        }
589                    }
590                }
591            }
592        }
593    }
594}
595
596#[cfg(test)]
597impl ManifestContext {
598    pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
599        self.manifest_manager.read().await.manifest()
600    }
601}
602
603pub(crate) type ManifestContextRef = Arc<ManifestContext>;
604
605/// Regions indexed by ids.
606#[derive(Debug, Default)]
607pub(crate) struct RegionMap {
608    regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
609}
610
611impl RegionMap {
612    /// Returns true if the region exists.
613    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
614        let regions = self.regions.read().unwrap();
615        regions.contains_key(&region_id)
616    }
617
618    /// Inserts a new region into the map.
619    pub(crate) fn insert_region(&self, region: MitoRegionRef) {
620        let mut regions = self.regions.write().unwrap();
621        regions.insert(region.region_id, region);
622    }
623
624    /// Gets region by region id.
625    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
626        let regions = self.regions.read().unwrap();
627        regions.get(&region_id).cloned()
628    }
629
630    /// Gets writable region by region id.
631    ///
632    /// Returns error if the region does not exist or is readonly.
633    pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
634        let region = self
635            .get_region(region_id)
636            .context(RegionNotFoundSnafu { region_id })?;
637        ensure!(
638            region.is_writable(),
639            RegionStateSnafu {
640                region_id,
641                state: region.state(),
642                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
643            }
644        );
645        Ok(region)
646    }
647
648    /// Gets readonly region by region id.
649    ///
650    /// Returns error if the region does not exist or is writable.
651    pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
652        let region = self
653            .get_region(region_id)
654            .context(RegionNotFoundSnafu { region_id })?;
655        ensure!(
656            region.is_follower(),
657            RegionStateSnafu {
658                region_id,
659                state: region.state(),
660                expect: RegionRoleState::Follower,
661            }
662        );
663
664        Ok(region)
665    }
666
667    /// Gets region by region id.
668    ///
669    /// Calls the callback if the region does not exist.
670    pub(crate) fn get_region_or<F: OnFailure>(
671        &self,
672        region_id: RegionId,
673        cb: &mut F,
674    ) -> Option<MitoRegionRef> {
675        match self
676            .get_region(region_id)
677            .context(RegionNotFoundSnafu { region_id })
678        {
679            Ok(region) => Some(region),
680            Err(e) => {
681                cb.on_failure(e);
682                None
683            }
684        }
685    }
686
687    /// Gets writable region by region id.
688    ///
689    /// Calls the callback if the region does not exist or is readonly.
690    pub(crate) fn writable_region_or<F: OnFailure>(
691        &self,
692        region_id: RegionId,
693        cb: &mut F,
694    ) -> Option<MitoRegionRef> {
695        match self.writable_region(region_id) {
696            Ok(region) => Some(region),
697            Err(e) => {
698                cb.on_failure(e);
699                None
700            }
701        }
702    }
703
704    /// Gets flushable region by region id.
705    ///
706    /// Returns error if the region does not exist or is not operable.
707    fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
708        let region = self
709            .get_region(region_id)
710            .context(RegionNotFoundSnafu { region_id })?;
711        ensure!(
712            region.is_flushable(),
713            FlushableRegionStateSnafu {
714                region_id,
715                state: region.state(),
716            }
717        );
718        Ok(region)
719    }
720
721    /// Gets flushable region by region id.
722    ///
723    /// Calls the callback if the region does not exist or is not operable.
724    pub(crate) fn flushable_region_or<F: OnFailure>(
725        &self,
726        region_id: RegionId,
727        cb: &mut F,
728    ) -> Option<MitoRegionRef> {
729        match self.flushable_region(region_id) {
730            Ok(region) => Some(region),
731            Err(e) => {
732                cb.on_failure(e);
733                None
734            }
735        }
736    }
737
738    /// Remove region by id.
739    pub(crate) fn remove_region(&self, region_id: RegionId) {
740        let mut regions = self.regions.write().unwrap();
741        regions.remove(&region_id);
742    }
743
744    /// List all regions.
745    pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
746        let regions = self.regions.read().unwrap();
747        regions.values().cloned().collect()
748    }
749
750    /// Clear the map.
751    pub(crate) fn clear(&self) {
752        self.regions.write().unwrap().clear();
753    }
754}
755
756pub(crate) type RegionMapRef = Arc<RegionMap>;
757
758/// Opening regions
759#[derive(Debug, Default)]
760pub(crate) struct OpeningRegions {
761    regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
762}
763
764impl OpeningRegions {
765    /// Registers `sender` for an opening region; Otherwise, it returns `None`.
766    pub(crate) fn wait_for_opening_region(
767        &self,
768        region_id: RegionId,
769        sender: OptionOutputTx,
770    ) -> Option<OptionOutputTx> {
771        let mut regions = self.regions.write().unwrap();
772        match regions.entry(region_id) {
773            Entry::Occupied(mut senders) => {
774                senders.get_mut().push(sender);
775                None
776            }
777            Entry::Vacant(_) => Some(sender),
778        }
779    }
780
781    /// Returns true if the region exists.
782    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
783        let regions = self.regions.read().unwrap();
784        regions.contains_key(&region_id)
785    }
786
787    /// Inserts a new region into the map.
788    pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
789        let mut regions = self.regions.write().unwrap();
790        regions.insert(region, vec![sender]);
791    }
792
793    /// Remove region by id.
794    pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
795        let mut regions = self.regions.write().unwrap();
796        regions.remove(&region_id).unwrap_or_default()
797    }
798
799    #[cfg(test)]
800    pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
801        let regions = self.regions.read().unwrap();
802        if let Some(senders) = regions.get(&region_id) {
803            senders.len()
804        } else {
805            0
806        }
807    }
808}
809
810pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
811
812/// Manifest stats.
813#[derive(Default, Debug, Clone)]
814pub(crate) struct ManifestStats {
815    total_manifest_size: Arc<AtomicU64>,
816    manifest_version: Arc<AtomicU64>,
817}
818
819impl ManifestStats {
820    fn total_manifest_size(&self) -> u64 {
821        self.total_manifest_size.load(Ordering::Relaxed)
822    }
823
824    fn manifest_version(&self) -> u64 {
825        self.manifest_version.load(Ordering::Relaxed)
826    }
827}
828
829#[cfg(test)]
830mod tests {
831    use std::sync::Arc;
832
833    use crossbeam_utils::atomic::AtomicCell;
834    use store_api::region_engine::RegionRole;
835    use store_api::storage::RegionId;
836
837    use crate::region::{RegionLeaderState, RegionRoleState};
838    use crate::test_util::scheduler_util::SchedulerEnv;
839    use crate::test_util::version_util::VersionControlBuilder;
840
841    #[test]
842    fn test_region_state_lock_free() {
843        assert!(AtomicCell::<RegionRoleState>::is_lock_free());
844    }
845
846    #[tokio::test]
847    async fn test_set_region_state() {
848        let env = SchedulerEnv::new().await;
849        let builder = VersionControlBuilder::new();
850        let version_control = Arc::new(builder.build());
851        let manifest_ctx = env
852            .mock_manifest_context(version_control.current().version.metadata.clone())
853            .await;
854
855        let region_id = RegionId::new(1024, 0);
856        // Leader -> Follower
857        manifest_ctx.set_role(RegionRole::Follower, region_id);
858        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
859
860        // Follower -> Leader
861        manifest_ctx.set_role(RegionRole::Leader, region_id);
862        assert_eq!(
863            manifest_ctx.state.load(),
864            RegionRoleState::Leader(RegionLeaderState::Writable)
865        );
866
867        // Leader -> Downgrading Leader
868        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
869        assert_eq!(
870            manifest_ctx.state.load(),
871            RegionRoleState::Leader(RegionLeaderState::Downgrading)
872        );
873
874        // Downgrading Leader -> Follower
875        manifest_ctx.set_role(RegionRole::Follower, region_id);
876        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
877
878        // Can't downgrade from follower (Follower -> Downgrading Leader)
879        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
880        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
881
882        // Set region role too Downgrading Leader
883        manifest_ctx.set_role(RegionRole::Leader, region_id);
884        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
885        assert_eq!(
886            manifest_ctx.state.load(),
887            RegionRoleState::Leader(RegionLeaderState::Downgrading)
888        );
889
890        // Downgrading Leader -> Leader
891        manifest_ctx.set_role(RegionRole::Leader, region_id);
892        assert_eq!(
893            manifest_ctx.state.load(),
894            RegionRoleState::Leader(RegionLeaderState::Writable)
895        );
896    }
897}