mito2/
region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region.
16
17pub mod opener;
18pub mod options;
19pub(crate) mod version;
20
21use std::collections::hash_map::Entry;
22use std::collections::HashMap;
23use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
24use std::sync::{Arc, RwLock};
25
26use common_telemetry::{error, info, warn};
27use crossbeam_utils::atomic::AtomicCell;
28use snafu::{ensure, OptionExt};
29use store_api::codec::PrimaryKeyEncoding;
30use store_api::logstore::provider::Provider;
31use store_api::manifest::ManifestVersion;
32use store_api::metadata::RegionMetadataRef;
33use store_api::region_engine::{
34    RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
35};
36use store_api::storage::RegionId;
37
38use crate::access_layer::AccessLayerRef;
39use crate::error::{
40    FlushableRegionStateSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
41    UpdateManifestSnafu,
42};
43use crate::manifest::action::{RegionManifest, RegionMetaAction, RegionMetaActionList};
44use crate::manifest::manager::RegionManifestManager;
45use crate::memtable::MemtableBuilderRef;
46use crate::region::version::{VersionControlRef, VersionRef};
47use crate::request::{OnFailure, OptionOutputTx};
48use crate::sst::file_purger::FilePurgerRef;
49use crate::time_provider::TimeProviderRef;
50
51/// This is the approximate factor to estimate the size of wal.
52const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
53
54/// Region status include region id, memtable usage, sst usage, wal usage and manifest usage.
55#[derive(Debug)]
56pub struct RegionUsage {
57    pub region_id: RegionId,
58    pub wal_usage: u64,
59    pub sst_usage: u64,
60    pub manifest_usage: u64,
61}
62
63impl RegionUsage {
64    pub fn disk_usage(&self) -> u64 {
65        self.wal_usage + self.sst_usage + self.manifest_usage
66    }
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum RegionLeaderState {
71    /// The region is opened and is writable.
72    Writable,
73    /// The region is altering.
74    Altering,
75    /// The region is dropping.
76    Dropping,
77    /// The region is truncating.
78    Truncating,
79    /// The region is handling a region edit.
80    Editing,
81    /// The region is stepping down.
82    Downgrading,
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum RegionRoleState {
87    Leader(RegionLeaderState),
88    Follower,
89}
90
91/// Metadata and runtime status of a region.
92///
93/// Writing and reading a region follow a single-writer-multi-reader rule:
94/// - Only the region worker thread this region belongs to can modify the metadata.
95/// - Multiple reader threads are allowed to read a specific `version` of a region.
96#[derive(Debug)]
97pub(crate) struct MitoRegion {
98    /// Id of this region.
99    ///
100    /// Accessing region id from the version control is inconvenient so
101    /// we also store it here.
102    pub(crate) region_id: RegionId,
103
104    /// Version controller for this region.
105    ///
106    /// We MUST update the version control inside the write lock of the region manifest manager.
107    pub(crate) version_control: VersionControlRef,
108    /// SSTs accessor for this region.
109    pub(crate) access_layer: AccessLayerRef,
110    /// Context to maintain manifest for this region.
111    pub(crate) manifest_ctx: ManifestContextRef,
112    /// SST file purger.
113    pub(crate) file_purger: FilePurgerRef,
114    /// The provider of log store.
115    pub(crate) provider: Provider,
116    /// Last flush time in millis.
117    last_flush_millis: AtomicI64,
118    /// Last compaction time in millis.
119    last_compaction_millis: AtomicI64,
120    /// Provider to get current time.
121    time_provider: TimeProviderRef,
122    /// The topic's latest entry id since the region's last flushing.
123    /// **Only used for remote WAL pruning.**
124    ///
125    /// The value will be updated to the high watermark of the topic
126    /// if region receives a flush request or schedules a periodic flush task
127    /// and the region's memtable is empty.    
128    ///
129    /// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region,
130    /// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`.
131    pub(crate) topic_latest_entry_id: AtomicU64,
132    /// Memtable builder for the region.
133    pub(crate) memtable_builder: MemtableBuilderRef,
134    /// manifest stats
135    stats: ManifestStats,
136}
137
138pub(crate) type MitoRegionRef = Arc<MitoRegion>;
139
140impl MitoRegion {
141    /// Stop background managers for this region.
142    pub(crate) async fn stop(&self) {
143        self.manifest_ctx
144            .manifest_manager
145            .write()
146            .await
147            .stop()
148            .await;
149
150        info!(
151            "Stopped region manifest manager, region_id: {}",
152            self.region_id
153        );
154    }
155
156    /// Returns current metadata of the region.
157    pub(crate) fn metadata(&self) -> RegionMetadataRef {
158        let version_data = self.version_control.current();
159        version_data.version.metadata.clone()
160    }
161
162    /// Returns primary key encoding of the region.
163    pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
164        let version_data = self.version_control.current();
165        version_data.version.metadata.primary_key_encoding
166    }
167
168    /// Returns current version of the region.
169    pub(crate) fn version(&self) -> VersionRef {
170        let version_data = self.version_control.current();
171        version_data.version
172    }
173
174    /// Returns last flush timestamp in millis.
175    pub(crate) fn last_flush_millis(&self) -> i64 {
176        self.last_flush_millis.load(Ordering::Relaxed)
177    }
178
179    /// Update flush time to current time.
180    pub(crate) fn update_flush_millis(&self) {
181        let now = self.time_provider.current_time_millis();
182        self.last_flush_millis.store(now, Ordering::Relaxed);
183    }
184
185    /// Return last compaction time in millis.
186    pub(crate) fn last_compaction_millis(&self) -> i64 {
187        self.last_compaction_millis.load(Ordering::Relaxed)
188    }
189
190    /// Update compaction time to now millis.
191    pub(crate) fn update_compaction_millis(&self) {
192        let now = self.time_provider.current_time_millis();
193        self.last_compaction_millis.store(now, Ordering::Relaxed);
194    }
195
196    /// Returns the region dir.
197    pub(crate) fn region_dir(&self) -> &str {
198        self.access_layer.region_dir()
199    }
200
201    /// Returns whether the region is writable.
202    pub(crate) fn is_writable(&self) -> bool {
203        self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Writable)
204    }
205
206    /// Returns whether the region is flushable.
207    pub(crate) fn is_flushable(&self) -> bool {
208        matches!(
209            self.manifest_ctx.state.load(),
210            RegionRoleState::Leader(RegionLeaderState::Writable)
211                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
212        )
213    }
214
215    /// Returns whether the region is downgrading.
216    pub(crate) fn is_downgrading(&self) -> bool {
217        matches!(
218            self.manifest_ctx.state.load(),
219            RegionRoleState::Leader(RegionLeaderState::Downgrading)
220        )
221    }
222
223    /// Returns whether the region is readonly.
224    pub(crate) fn is_follower(&self) -> bool {
225        self.manifest_ctx.state.load() == RegionRoleState::Follower
226    }
227
228    /// Returns the state of the region.
229    pub(crate) fn state(&self) -> RegionRoleState {
230        self.manifest_ctx.state.load()
231    }
232
233    /// Sets the region role state.
234    pub(crate) fn set_role(&self, next_role: RegionRole) {
235        self.manifest_ctx.set_role(next_role, self.region_id);
236    }
237
238    /// Sets the altering state.
239    /// You should call this method in the worker loop.
240    pub(crate) fn set_altering(&self) -> Result<()> {
241        self.compare_exchange_state(
242            RegionLeaderState::Writable,
243            RegionRoleState::Leader(RegionLeaderState::Altering),
244        )
245    }
246
247    /// Sets the dropping state.
248    /// You should call this method in the worker loop.
249    pub(crate) fn set_dropping(&self) -> Result<()> {
250        self.compare_exchange_state(
251            RegionLeaderState::Writable,
252            RegionRoleState::Leader(RegionLeaderState::Dropping),
253        )
254    }
255
256    /// Sets the truncating state.
257    /// You should call this method in the worker loop.
258    pub(crate) fn set_truncating(&self) -> Result<()> {
259        self.compare_exchange_state(
260            RegionLeaderState::Writable,
261            RegionRoleState::Leader(RegionLeaderState::Truncating),
262        )
263    }
264
265    /// Sets the editing state.
266    /// You should call this method in the worker loop.
267    pub(crate) fn set_editing(&self) -> Result<()> {
268        self.compare_exchange_state(
269            RegionLeaderState::Writable,
270            RegionRoleState::Leader(RegionLeaderState::Editing),
271        )
272    }
273
274    /// Sets the region to readonly gracefully. This acquires the manifest write lock.
275    pub(crate) async fn set_role_state_gracefully(&self, state: SettableRegionRoleState) {
276        let _manager = self.manifest_ctx.manifest_manager.write().await;
277        // We acquires the write lock of the manifest manager to ensure that no one is updating the manifest.
278        // Then we change the state.
279        self.set_role(state.into());
280    }
281
282    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Writable)` if the current state is `expect`.
283    /// Otherwise, logs an error.
284    pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
285        if let Err(e) = self
286            .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
287        {
288            error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
289        }
290    }
291
292    /// Returns the region statistic.
293    pub(crate) fn region_statistic(&self) -> RegionStatistic {
294        let version = self.version();
295        let memtables = &version.memtables;
296        let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
297
298        let sst_usage = version.ssts.sst_usage();
299        let index_usage = version.ssts.index_usage();
300        let flushed_entry_id = version.flushed_entry_id;
301
302        let wal_usage = self.estimated_wal_usage(memtable_usage);
303        let manifest_usage = self.stats.total_manifest_size();
304        let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
305        let manifest_version = self.stats.manifest_version();
306
307        let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
308
309        RegionStatistic {
310            num_rows,
311            memtable_size: memtable_usage,
312            wal_size: wal_usage,
313            manifest_size: manifest_usage,
314            sst_size: sst_usage,
315            index_size: index_usage,
316            manifest: RegionManifestInfo::Mito {
317                manifest_version,
318                flushed_entry_id,
319            },
320            data_topic_latest_entry_id: topic_latest_entry_id,
321            metadata_topic_latest_entry_id: topic_latest_entry_id,
322        }
323    }
324
325    /// Estimated WAL size in bytes.
326    /// Use the memtables size to estimate the size of wal.
327    fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
328        ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
329    }
330
331    /// Sets the state of the region to given state if the current state equals to
332    /// the expected.
333    fn compare_exchange_state(
334        &self,
335        expect: RegionLeaderState,
336        state: RegionRoleState,
337    ) -> Result<()> {
338        self.manifest_ctx
339            .state
340            .compare_exchange(RegionRoleState::Leader(expect), state)
341            .map_err(|actual| {
342                RegionStateSnafu {
343                    region_id: self.region_id,
344                    state: actual,
345                    expect: RegionRoleState::Leader(expect),
346                }
347                .build()
348            })?;
349        Ok(())
350    }
351}
352
353/// Context to update the region manifest.
354#[derive(Debug)]
355pub(crate) struct ManifestContext {
356    /// Manager to maintain manifest for this region.
357    manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
358    /// The state of the region. The region checks the state before updating
359    /// manifest.
360    state: AtomicCell<RegionRoleState>,
361}
362
363impl ManifestContext {
364    pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
365        ManifestContext {
366            manifest_manager: tokio::sync::RwLock::new(manager),
367            state: AtomicCell::new(state),
368        }
369    }
370
371    pub(crate) async fn manifest_version(&self) -> ManifestVersion {
372        self.manifest_manager
373            .read()
374            .await
375            .manifest()
376            .manifest_version
377    }
378
379    pub(crate) async fn has_update(&self) -> Result<bool> {
380        self.manifest_manager.read().await.has_update().await
381    }
382
383    /// Installs the manifest changes from the current version to the target version (inclusive).
384    ///
385    /// Returns installed [RegionManifest].
386    /// **Note**: This function is not guaranteed to install the target version strictly.
387    /// The installed version may be greater than the target version.
388    pub(crate) async fn install_manifest_to(
389        &self,
390        version: ManifestVersion,
391    ) -> Result<Arc<RegionManifest>> {
392        let mut manager = self.manifest_manager.write().await;
393        manager.install_manifest_to(version).await?;
394
395        Ok(manager.manifest())
396    }
397
398    /// Updates the manifest if current state is `expect_state`.
399    pub(crate) async fn update_manifest(
400        &self,
401        expect_state: RegionLeaderState,
402        action_list: RegionMetaActionList,
403    ) -> Result<ManifestVersion> {
404        // Acquires the write lock of the manifest manager.
405        let mut manager = self.manifest_manager.write().await;
406        // Gets current manifest.
407        let manifest = manager.manifest();
408        // Checks state inside the lock. This is to ensure that we won't update the manifest
409        // after `set_readonly_gracefully()` is called.
410        let current_state = self.state.load();
411
412        // If expect_state is not downgrading, the current state must be either `expect_state` or downgrading.
413        //
414        // A downgrading leader rejects user writes but still allows
415        // flushing the memtable and updating the manifest.
416        if expect_state != RegionLeaderState::Downgrading {
417            if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
418                info!(
419                    "Region {} is in downgrading leader state, updating manifest. state is {:?}",
420                    manifest.metadata.region_id, expect_state
421                );
422            }
423            ensure!(
424                current_state == RegionRoleState::Leader(expect_state)
425                    || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
426                UpdateManifestSnafu {
427                    region_id: manifest.metadata.region_id,
428                    state: current_state,
429                }
430            );
431        } else {
432            ensure!(
433                current_state == RegionRoleState::Leader(expect_state),
434                RegionStateSnafu {
435                    region_id: manifest.metadata.region_id,
436                    state: current_state,
437                    expect: RegionRoleState::Leader(expect_state),
438                }
439            );
440        }
441
442        for action in &action_list.actions {
443            // Checks whether the edit is still applicable.
444            let RegionMetaAction::Edit(edit) = &action else {
445                continue;
446            };
447
448            // Checks whether the region is truncated.
449            let Some(truncated_entry_id) = manifest.truncated_entry_id else {
450                continue;
451            };
452
453            // This is an edit from flush.
454            if let Some(flushed_entry_id) = edit.flushed_entry_id {
455                ensure!(
456                    truncated_entry_id < flushed_entry_id,
457                    RegionTruncatedSnafu {
458                        region_id: manifest.metadata.region_id,
459                    }
460                );
461            }
462
463            // This is an edit from compaction.
464            if !edit.files_to_remove.is_empty() {
465                // Input files of the compaction task has been truncated.
466                for file in &edit.files_to_remove {
467                    ensure!(
468                        manifest.files.contains_key(&file.file_id),
469                        RegionTruncatedSnafu {
470                            region_id: manifest.metadata.region_id,
471                        }
472                    );
473                }
474            }
475        }
476
477        // Now we can update the manifest.
478        let version = manager.update(action_list).await.inspect_err(
479            |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
480        )?;
481
482        if self.state.load() == RegionRoleState::Follower {
483            warn!(
484                "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
485                manifest.metadata.region_id
486            );
487        }
488
489        Ok(version)
490    }
491
492    /// Sets the [`RegionRole`].
493    ///
494    /// ```
495    ///     +------------------------------------------+
496    ///     |                      +-----------------+ |
497    ///     |                      |                 | |
498    /// +---+------+       +-------+-----+        +--v-v---+
499    /// | Follower |       | Downgrading |        | Leader |
500    /// +---^-^----+       +-----+-^-----+        +--+-+---+
501    ///     | |                  | |                 | |
502    ///     | +------------------+ +-----------------+ |
503    ///     +------------------------------------------+
504    ///
505    /// Transition:
506    /// - Follower -> Leader
507    /// - Downgrading Leader -> Leader
508    /// - Leader -> Follower
509    /// - Downgrading Leader -> Follower
510    /// - Leader -> Downgrading Leader
511    ///
512    /// ```
513    pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
514        match next_role {
515            RegionRole::Follower => {
516                match self.state.fetch_update(|state| {
517                    if !matches!(state, RegionRoleState::Follower) {
518                        Some(RegionRoleState::Follower)
519                    } else {
520                        None
521                    }
522                }) {
523                    Ok(state) => info!(
524                        "Convert region {} to follower, previous role state: {:?}",
525                        region_id, state
526                    ),
527                    Err(state) => {
528                        if state != RegionRoleState::Follower {
529                            warn!(
530                                "Failed to convert region {} to follower, current role state: {:?}",
531                                region_id, state
532                            )
533                        }
534                    }
535                }
536            }
537            RegionRole::Leader => {
538                match self.state.fetch_update(|state| {
539                    if matches!(
540                        state,
541                        RegionRoleState::Follower
542                            | RegionRoleState::Leader(RegionLeaderState::Downgrading)
543                    ) {
544                        Some(RegionRoleState::Leader(RegionLeaderState::Writable))
545                    } else {
546                        None
547                    }
548                }) {
549                    Ok(state) => info!(
550                        "Convert region {} to leader, previous role state: {:?}",
551                        region_id, state
552                    ),
553                    Err(state) => {
554                        if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
555                            warn!(
556                                "Failed to convert region {} to leader, current role state: {:?}",
557                                region_id, state
558                            )
559                        }
560                    }
561                }
562            }
563            RegionRole::DowngradingLeader => {
564                match self.state.compare_exchange(
565                    RegionRoleState::Leader(RegionLeaderState::Writable),
566                    RegionRoleState::Leader(RegionLeaderState::Downgrading),
567                ) {
568                    Ok(state) => info!(
569                        "Convert region {} to downgrading region, previous role state: {:?}",
570                        region_id, state
571                    ),
572                    Err(state) => {
573                        if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
574                            warn!(
575                                "Failed to convert region {} to downgrading leader, current role state: {:?}",
576                                region_id, state
577                            )
578                        }
579                    }
580                }
581            }
582        }
583    }
584}
585
586#[cfg(test)]
587impl ManifestContext {
588    pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
589        self.manifest_manager.read().await.manifest()
590    }
591}
592
593pub(crate) type ManifestContextRef = Arc<ManifestContext>;
594
595/// Regions indexed by ids.
596#[derive(Debug, Default)]
597pub(crate) struct RegionMap {
598    regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
599}
600
601impl RegionMap {
602    /// Returns true if the region exists.
603    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
604        let regions = self.regions.read().unwrap();
605        regions.contains_key(&region_id)
606    }
607
608    /// Inserts a new region into the map.
609    pub(crate) fn insert_region(&self, region: MitoRegionRef) {
610        let mut regions = self.regions.write().unwrap();
611        regions.insert(region.region_id, region);
612    }
613
614    /// Gets region by region id.
615    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
616        let regions = self.regions.read().unwrap();
617        regions.get(&region_id).cloned()
618    }
619
620    /// Gets writable region by region id.
621    ///
622    /// Returns error if the region does not exist or is readonly.
623    pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
624        let region = self
625            .get_region(region_id)
626            .context(RegionNotFoundSnafu { region_id })?;
627        ensure!(
628            region.is_writable(),
629            RegionStateSnafu {
630                region_id,
631                state: region.state(),
632                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
633            }
634        );
635        Ok(region)
636    }
637
638    /// Gets readonly region by region id.
639    ///
640    /// Returns error if the region does not exist or is writable.
641    pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
642        let region = self
643            .get_region(region_id)
644            .context(RegionNotFoundSnafu { region_id })?;
645        ensure!(
646            region.is_follower(),
647            RegionStateSnafu {
648                region_id,
649                state: region.state(),
650                expect: RegionRoleState::Follower,
651            }
652        );
653
654        Ok(region)
655    }
656
657    /// Gets region by region id.
658    ///
659    /// Calls the callback if the region does not exist.
660    pub(crate) fn get_region_or<F: OnFailure>(
661        &self,
662        region_id: RegionId,
663        cb: &mut F,
664    ) -> Option<MitoRegionRef> {
665        match self
666            .get_region(region_id)
667            .context(RegionNotFoundSnafu { region_id })
668        {
669            Ok(region) => Some(region),
670            Err(e) => {
671                cb.on_failure(e);
672                None
673            }
674        }
675    }
676
677    /// Gets writable region by region id.
678    ///
679    /// Calls the callback if the region does not exist or is readonly.
680    pub(crate) fn writable_region_or<F: OnFailure>(
681        &self,
682        region_id: RegionId,
683        cb: &mut F,
684    ) -> Option<MitoRegionRef> {
685        match self.writable_region(region_id) {
686            Ok(region) => Some(region),
687            Err(e) => {
688                cb.on_failure(e);
689                None
690            }
691        }
692    }
693
694    /// Gets flushable region by region id.
695    ///
696    /// Returns error if the region does not exist or is not operable.
697    fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
698        let region = self
699            .get_region(region_id)
700            .context(RegionNotFoundSnafu { region_id })?;
701        ensure!(
702            region.is_flushable(),
703            FlushableRegionStateSnafu {
704                region_id,
705                state: region.state(),
706            }
707        );
708        Ok(region)
709    }
710
711    /// Gets flushable region by region id.
712    ///
713    /// Calls the callback if the region does not exist or is not operable.
714    pub(crate) fn flushable_region_or<F: OnFailure>(
715        &self,
716        region_id: RegionId,
717        cb: &mut F,
718    ) -> Option<MitoRegionRef> {
719        match self.flushable_region(region_id) {
720            Ok(region) => Some(region),
721            Err(e) => {
722                cb.on_failure(e);
723                None
724            }
725        }
726    }
727
728    /// Remove region by id.
729    pub(crate) fn remove_region(&self, region_id: RegionId) {
730        let mut regions = self.regions.write().unwrap();
731        regions.remove(&region_id);
732    }
733
734    /// List all regions.
735    pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
736        let regions = self.regions.read().unwrap();
737        regions.values().cloned().collect()
738    }
739
740    /// Clear the map.
741    pub(crate) fn clear(&self) {
742        self.regions.write().unwrap().clear();
743    }
744}
745
746pub(crate) type RegionMapRef = Arc<RegionMap>;
747
748/// Opening regions
749#[derive(Debug, Default)]
750pub(crate) struct OpeningRegions {
751    regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
752}
753
754impl OpeningRegions {
755    /// Registers `sender` for an opening region; Otherwise, it returns `None`.
756    pub(crate) fn wait_for_opening_region(
757        &self,
758        region_id: RegionId,
759        sender: OptionOutputTx,
760    ) -> Option<OptionOutputTx> {
761        let mut regions = self.regions.write().unwrap();
762        match regions.entry(region_id) {
763            Entry::Occupied(mut senders) => {
764                senders.get_mut().push(sender);
765                None
766            }
767            Entry::Vacant(_) => Some(sender),
768        }
769    }
770
771    /// Returns true if the region exists.
772    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
773        let regions = self.regions.read().unwrap();
774        regions.contains_key(&region_id)
775    }
776
777    /// Inserts a new region into the map.
778    pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
779        let mut regions = self.regions.write().unwrap();
780        regions.insert(region, vec![sender]);
781    }
782
783    /// Remove region by id.
784    pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
785        let mut regions = self.regions.write().unwrap();
786        regions.remove(&region_id).unwrap_or_default()
787    }
788
789    #[cfg(test)]
790    pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
791        let regions = self.regions.read().unwrap();
792        if let Some(senders) = regions.get(&region_id) {
793            senders.len()
794        } else {
795            0
796        }
797    }
798}
799
800pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
801
802/// Manifest stats.
803#[derive(Default, Debug, Clone)]
804pub(crate) struct ManifestStats {
805    total_manifest_size: Arc<AtomicU64>,
806    manifest_version: Arc<AtomicU64>,
807}
808
809impl ManifestStats {
810    fn total_manifest_size(&self) -> u64 {
811        self.total_manifest_size.load(Ordering::Relaxed)
812    }
813
814    fn manifest_version(&self) -> u64 {
815        self.manifest_version.load(Ordering::Relaxed)
816    }
817}
818
819#[cfg(test)]
820mod tests {
821    use std::sync::Arc;
822
823    use crossbeam_utils::atomic::AtomicCell;
824    use store_api::region_engine::RegionRole;
825    use store_api::storage::RegionId;
826
827    use crate::region::{RegionLeaderState, RegionRoleState};
828    use crate::test_util::scheduler_util::SchedulerEnv;
829    use crate::test_util::version_util::VersionControlBuilder;
830
831    #[test]
832    fn test_region_state_lock_free() {
833        assert!(AtomicCell::<RegionRoleState>::is_lock_free());
834    }
835
836    #[tokio::test]
837    async fn test_set_region_state() {
838        let env = SchedulerEnv::new().await;
839        let builder = VersionControlBuilder::new();
840        let version_control = Arc::new(builder.build());
841        let manifest_ctx = env
842            .mock_manifest_context(version_control.current().version.metadata.clone())
843            .await;
844
845        let region_id = RegionId::new(1024, 0);
846        // Leader -> Follower
847        manifest_ctx.set_role(RegionRole::Follower, region_id);
848        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
849
850        // Follower -> Leader
851        manifest_ctx.set_role(RegionRole::Leader, region_id);
852        assert_eq!(
853            manifest_ctx.state.load(),
854            RegionRoleState::Leader(RegionLeaderState::Writable)
855        );
856
857        // Leader -> Downgrading Leader
858        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
859        assert_eq!(
860            manifest_ctx.state.load(),
861            RegionRoleState::Leader(RegionLeaderState::Downgrading)
862        );
863
864        // Downgrading Leader -> Follower
865        manifest_ctx.set_role(RegionRole::Follower, region_id);
866        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
867
868        // Can't downgrade from follower (Follower -> Downgrading Leader)
869        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
870        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
871
872        // Set region role too Downgrading Leader
873        manifest_ctx.set_role(RegionRole::Leader, region_id);
874        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
875        assert_eq!(
876            manifest_ctx.state.load(),
877            RegionRoleState::Leader(RegionLeaderState::Downgrading)
878        );
879
880        // Downgrading Leader -> Leader
881        manifest_ctx.set_role(RegionRole::Leader, region_id);
882        assert_eq!(
883            manifest_ctx.state.load(),
884            RegionRoleState::Leader(RegionLeaderState::Writable)
885        );
886    }
887}