mito2/
region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region.
16
17pub mod opener;
18pub mod options;
19pub(crate) mod version;
20
21use std::collections::hash_map::Entry;
22use std::collections::HashMap;
23use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
24use std::sync::{Arc, RwLock};
25
26use common_telemetry::{error, info, warn};
27use crossbeam_utils::atomic::AtomicCell;
28use snafu::{ensure, OptionExt};
29use store_api::codec::PrimaryKeyEncoding;
30use store_api::logstore::provider::Provider;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
34};
35use store_api::sst_entry::ManifestSstEntry;
36use store_api::storage::{RegionId, SequenceNumber};
37use store_api::ManifestVersion;
38
39use crate::access_layer::AccessLayerRef;
40use crate::error::{
41    FlushableRegionStateSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
42    UpdateManifestSnafu,
43};
44use crate::manifest::action::{RegionManifest, RegionMetaAction, RegionMetaActionList};
45use crate::manifest::manager::RegionManifestManager;
46use crate::memtable::MemtableBuilderRef;
47use crate::region::version::{VersionControlRef, VersionRef};
48use crate::request::{OnFailure, OptionOutputTx};
49use crate::sst::file_purger::FilePurgerRef;
50use crate::sst::location::{index_file_path, sst_file_path};
51use crate::time_provider::TimeProviderRef;
52
53/// This is the approximate factor to estimate the size of wal.
54const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
55
56/// Region status include region id, memtable usage, sst usage, wal usage and manifest usage.
57#[derive(Debug)]
58pub struct RegionUsage {
59    pub region_id: RegionId,
60    pub wal_usage: u64,
61    pub sst_usage: u64,
62    pub manifest_usage: u64,
63}
64
65impl RegionUsage {
66    pub fn disk_usage(&self) -> u64 {
67        self.wal_usage + self.sst_usage + self.manifest_usage
68    }
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum RegionLeaderState {
73    /// The region is opened and is writable.
74    Writable,
75    /// The region is in staging mode - writable but no checkpoint/compaction.
76    Staging,
77    /// The region is altering.
78    Altering,
79    /// The region is dropping.
80    Dropping,
81    /// The region is truncating.
82    Truncating,
83    /// The region is handling a region edit.
84    Editing,
85    /// The region is stepping down.
86    Downgrading,
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum RegionRoleState {
91    Leader(RegionLeaderState),
92    Follower,
93}
94
95/// Metadata and runtime status of a region.
96///
97/// Writing and reading a region follow a single-writer-multi-reader rule:
98/// - Only the region worker thread this region belongs to can modify the metadata.
99/// - Multiple reader threads are allowed to read a specific `version` of a region.
100#[derive(Debug)]
101pub struct MitoRegion {
102    /// Id of this region.
103    ///
104    /// Accessing region id from the version control is inconvenient so
105    /// we also store it here.
106    pub(crate) region_id: RegionId,
107
108    /// Version controller for this region.
109    ///
110    /// We MUST update the version control inside the write lock of the region manifest manager.
111    pub(crate) version_control: VersionControlRef,
112    /// SSTs accessor for this region.
113    pub(crate) access_layer: AccessLayerRef,
114    /// Context to maintain manifest for this region.
115    pub(crate) manifest_ctx: ManifestContextRef,
116    /// SST file purger.
117    pub(crate) file_purger: FilePurgerRef,
118    /// The provider of log store.
119    pub(crate) provider: Provider,
120    /// Last flush time in millis.
121    last_flush_millis: AtomicI64,
122    /// Last compaction time in millis.
123    last_compaction_millis: AtomicI64,
124    /// Provider to get current time.
125    time_provider: TimeProviderRef,
126    /// The topic's latest entry id since the region's last flushing.
127    /// **Only used for remote WAL pruning.**
128    ///
129    /// The value will be updated to the latest offset of the topic
130    /// if region receives a flush request or schedules a periodic flush task
131    /// and the region's memtable is empty.    
132    ///
133    /// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region,
134    /// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`.
135    pub(crate) topic_latest_entry_id: AtomicU64,
136    /// The total bytes written to the region.
137    pub(crate) written_bytes: Arc<AtomicU64>,
138    /// Memtable builder for the region.
139    pub(crate) memtable_builder: MemtableBuilderRef,
140    /// manifest stats
141    stats: ManifestStats,
142}
143
144pub type MitoRegionRef = Arc<MitoRegion>;
145
146impl MitoRegion {
147    /// Stop background managers for this region.
148    pub(crate) async fn stop(&self) {
149        self.manifest_ctx
150            .manifest_manager
151            .write()
152            .await
153            .stop()
154            .await;
155
156        info!(
157            "Stopped region manifest manager, region_id: {}",
158            self.region_id
159        );
160    }
161
162    /// Returns current metadata of the region.
163    pub(crate) fn metadata(&self) -> RegionMetadataRef {
164        let version_data = self.version_control.current();
165        version_data.version.metadata.clone()
166    }
167
168    /// Returns primary key encoding of the region.
169    pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
170        let version_data = self.version_control.current();
171        version_data.version.metadata.primary_key_encoding
172    }
173
174    /// Returns current version of the region.
175    pub(crate) fn version(&self) -> VersionRef {
176        let version_data = self.version_control.current();
177        version_data.version
178    }
179
180    /// Returns last flush timestamp in millis.
181    pub(crate) fn last_flush_millis(&self) -> i64 {
182        self.last_flush_millis.load(Ordering::Relaxed)
183    }
184
185    /// Update flush time to current time.
186    pub(crate) fn update_flush_millis(&self) {
187        let now = self.time_provider.current_time_millis();
188        self.last_flush_millis.store(now, Ordering::Relaxed);
189    }
190
191    /// Return last compaction time in millis.
192    pub(crate) fn last_compaction_millis(&self) -> i64 {
193        self.last_compaction_millis.load(Ordering::Relaxed)
194    }
195
196    /// Update compaction time to now millis.
197    pub(crate) fn update_compaction_millis(&self) {
198        let now = self.time_provider.current_time_millis();
199        self.last_compaction_millis.store(now, Ordering::Relaxed);
200    }
201
202    /// Returns the table dir.
203    pub(crate) fn table_dir(&self) -> &str {
204        self.access_layer.table_dir()
205    }
206
207    /// Returns whether the region is writable.
208    pub(crate) fn is_writable(&self) -> bool {
209        matches!(
210            self.manifest_ctx.state.load(),
211            RegionRoleState::Leader(RegionLeaderState::Writable)
212                | RegionRoleState::Leader(RegionLeaderState::Staging)
213        )
214    }
215
216    /// Returns whether the region is flushable.
217    pub(crate) fn is_flushable(&self) -> bool {
218        matches!(
219            self.manifest_ctx.state.load(),
220            RegionRoleState::Leader(RegionLeaderState::Writable)
221                | RegionRoleState::Leader(RegionLeaderState::Staging)
222                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
223        )
224    }
225
226    /// Returns whether the region is downgrading.
227    pub(crate) fn is_downgrading(&self) -> bool {
228        matches!(
229            self.manifest_ctx.state.load(),
230            RegionRoleState::Leader(RegionLeaderState::Downgrading)
231        )
232    }
233
234    /// Returns whether the region is in staging mode.
235    #[allow(dead_code)]
236    pub(crate) fn is_staging(&self) -> bool {
237        self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
238    }
239
240    pub fn region_id(&self) -> RegionId {
241        self.region_id
242    }
243
244    pub fn find_committed_sequence(&self) -> SequenceNumber {
245        self.version_control.committed_sequence()
246    }
247
248    /// Returns whether the region is readonly.
249    pub fn is_follower(&self) -> bool {
250        self.manifest_ctx.state.load() == RegionRoleState::Follower
251    }
252
253    /// Returns the state of the region.
254    pub(crate) fn state(&self) -> RegionRoleState {
255        self.manifest_ctx.state.load()
256    }
257
258    /// Sets the region role state.
259    pub(crate) fn set_role(&self, next_role: RegionRole) {
260        self.manifest_ctx.set_role(next_role, self.region_id);
261    }
262
263    /// Sets the altering state.
264    /// You should call this method in the worker loop.
265    pub(crate) fn set_altering(&self) -> Result<()> {
266        self.compare_exchange_state(
267            RegionLeaderState::Writable,
268            RegionRoleState::Leader(RegionLeaderState::Altering),
269        )
270    }
271
272    /// Sets the dropping state.
273    /// You should call this method in the worker loop.
274    pub(crate) fn set_dropping(&self) -> Result<()> {
275        self.compare_exchange_state(
276            RegionLeaderState::Writable,
277            RegionRoleState::Leader(RegionLeaderState::Dropping),
278        )
279    }
280
281    /// Sets the truncating state.
282    /// You should call this method in the worker loop.
283    pub(crate) fn set_truncating(&self) -> Result<()> {
284        self.compare_exchange_state(
285            RegionLeaderState::Writable,
286            RegionRoleState::Leader(RegionLeaderState::Truncating),
287        )
288    }
289
290    /// Sets the editing state.
291    /// You should call this method in the worker loop.
292    pub(crate) fn set_editing(&self) -> Result<()> {
293        self.compare_exchange_state(
294            RegionLeaderState::Writable,
295            RegionRoleState::Leader(RegionLeaderState::Editing),
296        )
297    }
298
299    /// Sets the staging state.
300    /// You should call this method in the worker loop.
301    /// Transitions from Writable to Staging state.
302    pub(crate) fn set_staging(&self) -> Result<()> {
303        self.compare_exchange_state(
304            RegionLeaderState::Writable,
305            RegionRoleState::Leader(RegionLeaderState::Staging),
306        )
307    }
308
309    /// Exits the staging state back to writable.
310    /// You should call this method in the worker loop.
311    /// Transitions from Staging to Writable state.
312    pub(crate) fn exit_staging(&self) -> Result<()> {
313        self.compare_exchange_state(
314            RegionLeaderState::Staging,
315            RegionRoleState::Leader(RegionLeaderState::Writable),
316        )
317    }
318
319    /// Sets the region role state gracefully. This acquires the manifest write lock.
320    pub(crate) async fn set_role_state_gracefully(
321        &self,
322        state: SettableRegionRoleState,
323    ) -> Result<()> {
324        let _manager = self.manifest_ctx.manifest_manager.write().await;
325        let current_state = self.state();
326
327        match state {
328            SettableRegionRoleState::Leader => {
329                // Exit staging mode and return to normal writable leader
330                // Only allowed from staging state
331                match current_state {
332                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
333                        info!("Exiting staging mode for region {}", self.region_id);
334                        self.exit_staging()?;
335                    }
336                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
337                        // Already in desired state - no-op
338                        info!("Region {} already in normal leader mode", self.region_id);
339                    }
340                    _ => {
341                        // Only staging -> leader transition is allowed
342                        return Err(RegionStateSnafu {
343                            region_id: self.region_id,
344                            state: current_state,
345                            expect: RegionRoleState::Leader(RegionLeaderState::Staging),
346                        }
347                        .build());
348                    }
349                }
350            }
351
352            SettableRegionRoleState::StagingLeader => {
353                // Enter staging mode from normal writable leader
354                // Only allowed from writable leader state
355                match current_state {
356                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
357                        info!("Entering staging mode for region {}", self.region_id);
358                        self.set_staging()?;
359                    }
360                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
361                        // Already in desired state - no-op
362                        info!("Region {} already in staging mode", self.region_id);
363                    }
364                    _ => {
365                        return Err(RegionStateSnafu {
366                            region_id: self.region_id,
367                            state: current_state,
368                            expect: RegionRoleState::Leader(RegionLeaderState::Writable),
369                        }
370                        .build());
371                    }
372                }
373            }
374
375            SettableRegionRoleState::Follower => {
376                // Make this region a follower
377                match current_state {
378                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
379                        info!(
380                            "Exiting staging and demoting region {} to follower",
381                            self.region_id
382                        );
383                        self.exit_staging()?;
384                        self.set_role(RegionRole::Follower);
385                    }
386                    RegionRoleState::Leader(_) => {
387                        info!("Demoting region {} from leader to follower", self.region_id);
388                        self.set_role(RegionRole::Follower);
389                    }
390                    RegionRoleState::Follower => {
391                        // Already in desired state - no-op
392                        info!("Region {} already in follower mode", self.region_id);
393                    }
394                }
395            }
396
397            SettableRegionRoleState::DowngradingLeader => {
398                // downgrade this region to downgrading leader
399                match current_state {
400                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
401                        info!(
402                            "Exiting staging and entering downgrade for region {}",
403                            self.region_id
404                        );
405                        self.exit_staging()?;
406                        self.set_role(RegionRole::DowngradingLeader);
407                    }
408                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
409                        info!("Starting downgrade for region {}", self.region_id);
410                        self.set_role(RegionRole::DowngradingLeader);
411                    }
412                    RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
413                        // Already in desired state - no-op
414                        info!("Region {} already in downgrading mode", self.region_id);
415                    }
416                    _ => {
417                        warn!(
418                            "Cannot start downgrade for region {} from state {:?}",
419                            self.region_id, current_state
420                        );
421                    }
422                }
423            }
424        }
425
426        Ok(())
427    }
428
429    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Writable)` if the current state is `expect`.
430    /// Otherwise, logs an error.
431    pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
432        if let Err(e) = self
433            .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
434        {
435            error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
436        }
437    }
438
439    /// Returns the region statistic.
440    pub(crate) fn region_statistic(&self) -> RegionStatistic {
441        let version = self.version();
442        let memtables = &version.memtables;
443        let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
444
445        let sst_usage = version.ssts.sst_usage();
446        let index_usage = version.ssts.index_usage();
447        let flushed_entry_id = version.flushed_entry_id;
448
449        let wal_usage = self.estimated_wal_usage(memtable_usage);
450        let manifest_usage = self.stats.total_manifest_size();
451        let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
452        let num_files = version.ssts.num_files();
453        let manifest_version = self.stats.manifest_version();
454
455        let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
456        let written_bytes = self.written_bytes.load(Ordering::Relaxed);
457
458        RegionStatistic {
459            num_rows,
460            memtable_size: memtable_usage,
461            wal_size: wal_usage,
462            manifest_size: manifest_usage,
463            sst_size: sst_usage,
464            sst_num: num_files,
465            index_size: index_usage,
466            manifest: RegionManifestInfo::Mito {
467                manifest_version,
468                flushed_entry_id,
469            },
470            data_topic_latest_entry_id: topic_latest_entry_id,
471            metadata_topic_latest_entry_id: topic_latest_entry_id,
472            written_bytes,
473        }
474    }
475
476    /// Estimated WAL size in bytes.
477    /// Use the memtables size to estimate the size of wal.
478    fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
479        ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
480    }
481
482    /// Sets the state of the region to given state if the current state equals to
483    /// the expected.
484    fn compare_exchange_state(
485        &self,
486        expect: RegionLeaderState,
487        state: RegionRoleState,
488    ) -> Result<()> {
489        self.manifest_ctx
490            .state
491            .compare_exchange(RegionRoleState::Leader(expect), state)
492            .map_err(|actual| {
493                RegionStateSnafu {
494                    region_id: self.region_id,
495                    state: actual,
496                    expect: RegionRoleState::Leader(expect),
497                }
498                .build()
499            })?;
500        Ok(())
501    }
502
503    /// Returns the SST entries of the region.
504    pub fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
505        let table_dir = self.table_dir();
506        let path_type = self.access_layer.path_type();
507        self.version()
508            .ssts
509            .levels()
510            .iter()
511            .flat_map(|level| {
512                level.files().map(|file| {
513                    let meta = file.meta_ref();
514                    let region_id = meta.region_id;
515                    let (index_file_path, index_file_size) = if meta.index_file_size > 0 {
516                        let index_file_path = index_file_path(table_dir, meta.file_id(), path_type);
517                        (Some(index_file_path), Some(meta.index_file_size))
518                    } else {
519                        (None, None)
520                    };
521                    ManifestSstEntry {
522                        table_dir: table_dir.to_string(),
523                        region_id,
524                        table_id: region_id.table_id(),
525                        region_number: region_id.region_number(),
526                        region_group: region_id.region_group(),
527                        region_sequence: region_id.region_sequence(),
528                        file_id: meta.file_id.to_string(),
529                        level: meta.level,
530                        file_path: sst_file_path(table_dir, meta.file_id(), path_type),
531                        file_size: meta.file_size,
532                        index_file_path,
533                        index_file_size,
534                        num_rows: meta.num_rows,
535                        num_row_groups: meta.num_row_groups,
536                        min_ts: meta.time_range.0,
537                        max_ts: meta.time_range.1,
538                        sequence: meta.sequence.map(|s| s.get()),
539                    }
540                })
541            })
542            .collect()
543    }
544}
545
546/// Context to update the region manifest.
547#[derive(Debug)]
548pub(crate) struct ManifestContext {
549    /// Manager to maintain manifest for this region.
550    pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
551    /// The state of the region. The region checks the state before updating
552    /// manifest.
553    state: AtomicCell<RegionRoleState>,
554}
555
556impl ManifestContext {
557    pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
558        ManifestContext {
559            manifest_manager: tokio::sync::RwLock::new(manager),
560            state: AtomicCell::new(state),
561        }
562    }
563
564    pub(crate) async fn manifest_version(&self) -> ManifestVersion {
565        self.manifest_manager
566            .read()
567            .await
568            .manifest()
569            .manifest_version
570    }
571
572    pub(crate) async fn has_update(&self) -> Result<bool> {
573        self.manifest_manager.read().await.has_update().await
574    }
575
576    /// Returns the current region role state.
577    pub(crate) fn current_state(&self) -> RegionRoleState {
578        self.state.load()
579    }
580
581    /// Installs the manifest changes from the current version to the target version (inclusive).
582    ///
583    /// Returns installed [RegionManifest].
584    /// **Note**: This function is not guaranteed to install the target version strictly.
585    /// The installed version may be greater than the target version.
586    pub(crate) async fn install_manifest_to(
587        &self,
588        version: ManifestVersion,
589    ) -> Result<Arc<RegionManifest>> {
590        let mut manager = self.manifest_manager.write().await;
591        manager.install_manifest_to(version).await?;
592
593        Ok(manager.manifest())
594    }
595
596    /// Updates the manifest if current state is `expect_state`.
597    pub(crate) async fn update_manifest(
598        &self,
599        expect_state: RegionLeaderState,
600        action_list: RegionMetaActionList,
601    ) -> Result<ManifestVersion> {
602        // Acquires the write lock of the manifest manager.
603        let mut manager = self.manifest_manager.write().await;
604        // Gets current manifest.
605        let manifest = manager.manifest();
606        // Checks state inside the lock. This is to ensure that we won't update the manifest
607        // after `set_readonly_gracefully()` is called.
608        let current_state = self.state.load();
609
610        // If expect_state is not downgrading, the current state must be either `expect_state` or downgrading.
611        //
612        // A downgrading leader rejects user writes but still allows
613        // flushing the memtable and updating the manifest.
614        if expect_state != RegionLeaderState::Downgrading {
615            if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
616                info!(
617                    "Region {} is in downgrading leader state, updating manifest. state is {:?}",
618                    manifest.metadata.region_id, expect_state
619                );
620            }
621            ensure!(
622                current_state == RegionRoleState::Leader(expect_state)
623                    || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
624                UpdateManifestSnafu {
625                    region_id: manifest.metadata.region_id,
626                    state: current_state,
627                }
628            );
629        } else {
630            ensure!(
631                current_state == RegionRoleState::Leader(expect_state),
632                RegionStateSnafu {
633                    region_id: manifest.metadata.region_id,
634                    state: current_state,
635                    expect: RegionRoleState::Leader(expect_state),
636                }
637            );
638        }
639
640        for action in &action_list.actions {
641            // Checks whether the edit is still applicable.
642            let RegionMetaAction::Edit(edit) = &action else {
643                continue;
644            };
645
646            // Checks whether the region is truncated.
647            let Some(truncated_entry_id) = manifest.truncated_entry_id else {
648                continue;
649            };
650
651            // This is an edit from flush.
652            if let Some(flushed_entry_id) = edit.flushed_entry_id {
653                ensure!(
654                    truncated_entry_id < flushed_entry_id,
655                    RegionTruncatedSnafu {
656                        region_id: manifest.metadata.region_id,
657                    }
658                );
659            }
660
661            // This is an edit from compaction.
662            if !edit.files_to_remove.is_empty() {
663                // Input files of the compaction task has been truncated.
664                for file in &edit.files_to_remove {
665                    ensure!(
666                        manifest.files.contains_key(&file.file_id),
667                        RegionTruncatedSnafu {
668                            region_id: manifest.metadata.region_id,
669                        }
670                    );
671                }
672            }
673        }
674
675        // Now we can update the manifest.
676        let version = manager.update(action_list, current_state).await.inspect_err(
677            |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
678        )?;
679
680        if self.state.load() == RegionRoleState::Follower {
681            warn!(
682                "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
683                manifest.metadata.region_id
684            );
685        }
686
687        Ok(version)
688    }
689
690    /// Sets the [`RegionRole`].
691    ///
692    /// ```
693    ///     +------------------------------------------+
694    ///     |                      +-----------------+ |
695    ///     |                      |                 | |
696    /// +---+------+       +-------+-----+        +--v-v---+
697    /// | Follower |       | Downgrading |        | Leader |
698    /// +---^-^----+       +-----+-^-----+        +--+-+---+
699    ///     | |                  | |                 | |
700    ///     | +------------------+ +-----------------+ |
701    ///     +------------------------------------------+
702    ///
703    /// Transition:
704    /// - Follower -> Leader
705    /// - Downgrading Leader -> Leader
706    /// - Leader -> Follower
707    /// - Downgrading Leader -> Follower
708    /// - Leader -> Downgrading Leader
709    ///
710    /// ```
711    pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
712        match next_role {
713            RegionRole::Follower => {
714                match self.state.fetch_update(|state| {
715                    if !matches!(state, RegionRoleState::Follower) {
716                        Some(RegionRoleState::Follower)
717                    } else {
718                        None
719                    }
720                }) {
721                    Ok(state) => info!(
722                        "Convert region {} to follower, previous role state: {:?}",
723                        region_id, state
724                    ),
725                    Err(state) => {
726                        if state != RegionRoleState::Follower {
727                            warn!(
728                                "Failed to convert region {} to follower, current role state: {:?}",
729                                region_id, state
730                            )
731                        }
732                    }
733                }
734            }
735            RegionRole::Leader => {
736                match self.state.fetch_update(|state| {
737                    if matches!(
738                        state,
739                        RegionRoleState::Follower
740                            | RegionRoleState::Leader(RegionLeaderState::Downgrading)
741                    ) {
742                        Some(RegionRoleState::Leader(RegionLeaderState::Writable))
743                    } else {
744                        None
745                    }
746                }) {
747                    Ok(state) => info!(
748                        "Convert region {} to leader, previous role state: {:?}",
749                        region_id, state
750                    ),
751                    Err(state) => {
752                        if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
753                            warn!(
754                                "Failed to convert region {} to leader, current role state: {:?}",
755                                region_id, state
756                            )
757                        }
758                    }
759                }
760            }
761            RegionRole::DowngradingLeader => {
762                match self.state.compare_exchange(
763                    RegionRoleState::Leader(RegionLeaderState::Writable),
764                    RegionRoleState::Leader(RegionLeaderState::Downgrading),
765                ) {
766                    Ok(state) => info!(
767                        "Convert region {} to downgrading region, previous role state: {:?}",
768                        region_id, state
769                    ),
770                    Err(state) => {
771                        if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
772                            warn!(
773                                "Failed to convert region {} to downgrading leader, current role state: {:?}",
774                                region_id, state
775                            )
776                        }
777                    }
778                }
779            }
780        }
781    }
782
783    pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
784        self.manifest_manager.read().await.manifest()
785    }
786}
787
788pub(crate) type ManifestContextRef = Arc<ManifestContext>;
789
790/// Regions indexed by ids.
791#[derive(Debug, Default)]
792pub(crate) struct RegionMap {
793    regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
794}
795
796impl RegionMap {
797    /// Returns true if the region exists.
798    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
799        let regions = self.regions.read().unwrap();
800        regions.contains_key(&region_id)
801    }
802
803    /// Inserts a new region into the map.
804    pub(crate) fn insert_region(&self, region: MitoRegionRef) {
805        let mut regions = self.regions.write().unwrap();
806        regions.insert(region.region_id, region);
807    }
808
809    /// Gets region by region id.
810    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
811        let regions = self.regions.read().unwrap();
812        regions.get(&region_id).cloned()
813    }
814
815    /// Gets writable region by region id.
816    ///
817    /// Returns error if the region does not exist or is readonly.
818    pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
819        let region = self
820            .get_region(region_id)
821            .context(RegionNotFoundSnafu { region_id })?;
822        ensure!(
823            region.is_writable(),
824            RegionStateSnafu {
825                region_id,
826                state: region.state(),
827                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
828            }
829        );
830        Ok(region)
831    }
832
833    /// Gets readonly region by region id.
834    ///
835    /// Returns error if the region does not exist or is writable.
836    pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
837        let region = self
838            .get_region(region_id)
839            .context(RegionNotFoundSnafu { region_id })?;
840        ensure!(
841            region.is_follower(),
842            RegionStateSnafu {
843                region_id,
844                state: region.state(),
845                expect: RegionRoleState::Follower,
846            }
847        );
848
849        Ok(region)
850    }
851
852    /// Gets region by region id.
853    ///
854    /// Calls the callback if the region does not exist.
855    pub(crate) fn get_region_or<F: OnFailure>(
856        &self,
857        region_id: RegionId,
858        cb: &mut F,
859    ) -> Option<MitoRegionRef> {
860        match self
861            .get_region(region_id)
862            .context(RegionNotFoundSnafu { region_id })
863        {
864            Ok(region) => Some(region),
865            Err(e) => {
866                cb.on_failure(e);
867                None
868            }
869        }
870    }
871
872    /// Gets writable region by region id.
873    ///
874    /// Calls the callback if the region does not exist or is readonly.
875    pub(crate) fn writable_region_or<F: OnFailure>(
876        &self,
877        region_id: RegionId,
878        cb: &mut F,
879    ) -> Option<MitoRegionRef> {
880        match self.writable_region(region_id) {
881            Ok(region) => Some(region),
882            Err(e) => {
883                cb.on_failure(e);
884                None
885            }
886        }
887    }
888
889    /// Gets writable non-staging region by region id.
890    ///
891    /// Returns error if the region does not exist, is readonly, or is in staging mode.
892    pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
893        let region = self.writable_region(region_id)?;
894        if region.is_staging() {
895            return Err(crate::error::RegionStateSnafu {
896                region_id,
897                state: region.state(),
898                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
899            }
900            .build());
901        }
902        Ok(region)
903    }
904
905    /// Gets flushable region by region id.
906    ///
907    /// Returns error if the region does not exist or is not operable.
908    fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
909        let region = self
910            .get_region(region_id)
911            .context(RegionNotFoundSnafu { region_id })?;
912        ensure!(
913            region.is_flushable(),
914            FlushableRegionStateSnafu {
915                region_id,
916                state: region.state(),
917            }
918        );
919        Ok(region)
920    }
921
922    /// Gets flushable region by region id.
923    ///
924    /// Calls the callback if the region does not exist or is not operable.
925    pub(crate) fn flushable_region_or<F: OnFailure>(
926        &self,
927        region_id: RegionId,
928        cb: &mut F,
929    ) -> Option<MitoRegionRef> {
930        match self.flushable_region(region_id) {
931            Ok(region) => Some(region),
932            Err(e) => {
933                cb.on_failure(e);
934                None
935            }
936        }
937    }
938
939    /// Remove region by id.
940    pub(crate) fn remove_region(&self, region_id: RegionId) {
941        let mut regions = self.regions.write().unwrap();
942        regions.remove(&region_id);
943    }
944
945    /// List all regions.
946    pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
947        let regions = self.regions.read().unwrap();
948        regions.values().cloned().collect()
949    }
950
951    /// Clear the map.
952    pub(crate) fn clear(&self) {
953        self.regions.write().unwrap().clear();
954    }
955}
956
957pub(crate) type RegionMapRef = Arc<RegionMap>;
958
959/// Opening regions
960#[derive(Debug, Default)]
961pub(crate) struct OpeningRegions {
962    regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
963}
964
965impl OpeningRegions {
966    /// Registers `sender` for an opening region; Otherwise, it returns `None`.
967    pub(crate) fn wait_for_opening_region(
968        &self,
969        region_id: RegionId,
970        sender: OptionOutputTx,
971    ) -> Option<OptionOutputTx> {
972        let mut regions = self.regions.write().unwrap();
973        match regions.entry(region_id) {
974            Entry::Occupied(mut senders) => {
975                senders.get_mut().push(sender);
976                None
977            }
978            Entry::Vacant(_) => Some(sender),
979        }
980    }
981
982    /// Returns true if the region exists.
983    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
984        let regions = self.regions.read().unwrap();
985        regions.contains_key(&region_id)
986    }
987
988    /// Inserts a new region into the map.
989    pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
990        let mut regions = self.regions.write().unwrap();
991        regions.insert(region, vec![sender]);
992    }
993
994    /// Remove region by id.
995    pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
996        let mut regions = self.regions.write().unwrap();
997        regions.remove(&region_id).unwrap_or_default()
998    }
999
1000    #[cfg(test)]
1001    pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1002        let regions = self.regions.read().unwrap();
1003        if let Some(senders) = regions.get(&region_id) {
1004            senders.len()
1005        } else {
1006            0
1007        }
1008    }
1009}
1010
1011pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1012
1013/// Manifest stats.
1014#[derive(Default, Debug, Clone)]
1015pub(crate) struct ManifestStats {
1016    total_manifest_size: Arc<AtomicU64>,
1017    manifest_version: Arc<AtomicU64>,
1018}
1019
1020impl ManifestStats {
1021    fn total_manifest_size(&self) -> u64 {
1022        self.total_manifest_size.load(Ordering::Relaxed)
1023    }
1024
1025    fn manifest_version(&self) -> u64 {
1026        self.manifest_version.load(Ordering::Relaxed)
1027    }
1028}
1029
1030#[cfg(test)]
1031mod tests {
1032    use std::sync::atomic::AtomicU64;
1033    use std::sync::Arc;
1034
1035    use common_datasource::compression::CompressionType;
1036    use common_test_util::temp_dir::create_temp_dir;
1037    use crossbeam_utils::atomic::AtomicCell;
1038    use object_store::services::Fs;
1039    use object_store::ObjectStore;
1040    use store_api::logstore::provider::Provider;
1041    use store_api::region_engine::RegionRole;
1042    use store_api::region_request::PathType;
1043    use store_api::storage::RegionId;
1044
1045    use crate::access_layer::AccessLayer;
1046    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1047    use crate::region::{
1048        ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1049    };
1050    use crate::sst::index::intermediate::IntermediateManager;
1051    use crate::sst::index::puffin_manager::PuffinManagerFactory;
1052    use crate::test_util::memtable_util::EmptyMemtableBuilder;
1053    use crate::test_util::scheduler_util::SchedulerEnv;
1054    use crate::test_util::version_util::VersionControlBuilder;
1055    use crate::time_provider::StdTimeProvider;
1056
1057    #[test]
1058    fn test_region_state_lock_free() {
1059        assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1060    }
1061
1062    #[tokio::test]
1063    async fn test_set_region_state() {
1064        let env = SchedulerEnv::new().await;
1065        let builder = VersionControlBuilder::new();
1066        let version_control = Arc::new(builder.build());
1067        let manifest_ctx = env
1068            .mock_manifest_context(version_control.current().version.metadata.clone())
1069            .await;
1070
1071        let region_id = RegionId::new(1024, 0);
1072        // Leader -> Follower
1073        manifest_ctx.set_role(RegionRole::Follower, region_id);
1074        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1075
1076        // Follower -> Leader
1077        manifest_ctx.set_role(RegionRole::Leader, region_id);
1078        assert_eq!(
1079            manifest_ctx.state.load(),
1080            RegionRoleState::Leader(RegionLeaderState::Writable)
1081        );
1082
1083        // Leader -> Downgrading Leader
1084        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1085        assert_eq!(
1086            manifest_ctx.state.load(),
1087            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1088        );
1089
1090        // Downgrading Leader -> Follower
1091        manifest_ctx.set_role(RegionRole::Follower, region_id);
1092        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1093
1094        // Can't downgrade from follower (Follower -> Downgrading Leader)
1095        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1096        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1097
1098        // Set region role too Downgrading Leader
1099        manifest_ctx.set_role(RegionRole::Leader, region_id);
1100        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1101        assert_eq!(
1102            manifest_ctx.state.load(),
1103            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1104        );
1105
1106        // Downgrading Leader -> Leader
1107        manifest_ctx.set_role(RegionRole::Leader, region_id);
1108        assert_eq!(
1109            manifest_ctx.state.load(),
1110            RegionRoleState::Leader(RegionLeaderState::Writable)
1111        );
1112    }
1113
1114    #[tokio::test]
1115    async fn test_staging_state_validation() {
1116        let env = SchedulerEnv::new().await;
1117        let builder = VersionControlBuilder::new();
1118        let version_control = Arc::new(builder.build());
1119
1120        // Create context with staging state using the correct pattern from SchedulerEnv
1121        let staging_ctx = {
1122            let manager = RegionManifestManager::new(
1123                version_control.current().version.metadata.clone(),
1124                0,
1125                RegionManifestOptions {
1126                    manifest_dir: "".to_string(),
1127                    object_store: env.access_layer.object_store().clone(),
1128                    compress_type: CompressionType::Uncompressed,
1129                    checkpoint_distance: 10,
1130                    remove_file_options: Default::default(),
1131                },
1132                Default::default(),
1133                Default::default(),
1134            )
1135            .await
1136            .unwrap();
1137            Arc::new(ManifestContext::new(
1138                manager,
1139                RegionRoleState::Leader(RegionLeaderState::Staging),
1140            ))
1141        };
1142
1143        // Test staging state behavior
1144        assert_eq!(
1145            staging_ctx.current_state(),
1146            RegionRoleState::Leader(RegionLeaderState::Staging)
1147        );
1148
1149        // Test writable context for comparison
1150        let writable_ctx = env
1151            .mock_manifest_context(version_control.current().version.metadata.clone())
1152            .await;
1153
1154        assert_eq!(
1155            writable_ctx.current_state(),
1156            RegionRoleState::Leader(RegionLeaderState::Writable)
1157        );
1158    }
1159
1160    #[tokio::test]
1161    async fn test_staging_state_transitions() {
1162        let builder = VersionControlBuilder::new();
1163        let version_control = Arc::new(builder.build());
1164        let metadata = version_control.current().version.metadata.clone();
1165
1166        // Create MitoRegion for testing state transitions
1167        let temp_dir = create_temp_dir("");
1168        let path_str = temp_dir.path().display().to_string();
1169        let fs_builder = Fs::default().root(&path_str);
1170        let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1171
1172        let index_aux_path = temp_dir.path().join("index_aux");
1173        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1174            .await
1175            .unwrap();
1176        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1177            .await
1178            .unwrap();
1179
1180        let access_layer = Arc::new(AccessLayer::new(
1181            "",
1182            PathType::Bare,
1183            object_store,
1184            puffin_mgr,
1185            intm_mgr,
1186        ));
1187
1188        let manager = RegionManifestManager::new(
1189            metadata.clone(),
1190            0,
1191            RegionManifestOptions {
1192                manifest_dir: "".to_string(),
1193                object_store: access_layer.object_store().clone(),
1194                compress_type: CompressionType::Uncompressed,
1195                checkpoint_distance: 10,
1196                remove_file_options: Default::default(),
1197            },
1198            Default::default(),
1199            Default::default(),
1200        )
1201        .await
1202        .unwrap();
1203
1204        let manifest_ctx = Arc::new(ManifestContext::new(
1205            manager,
1206            RegionRoleState::Leader(RegionLeaderState::Writable),
1207        ));
1208
1209        let region = MitoRegion {
1210            region_id: metadata.region_id,
1211            version_control,
1212            access_layer,
1213            manifest_ctx,
1214            file_purger: crate::test_util::new_noop_file_purger(),
1215            provider: Provider::noop_provider(),
1216            last_flush_millis: Default::default(),
1217            last_compaction_millis: Default::default(),
1218            time_provider: Arc::new(StdTimeProvider),
1219            topic_latest_entry_id: Default::default(),
1220            written_bytes: Arc::new(AtomicU64::new(0)),
1221            memtable_builder: Arc::new(EmptyMemtableBuilder::default()),
1222            stats: ManifestStats::default(),
1223        };
1224
1225        // Test initial state
1226        assert_eq!(
1227            region.state(),
1228            RegionRoleState::Leader(RegionLeaderState::Writable)
1229        );
1230        assert!(!region.is_staging());
1231
1232        // Test transition to staging
1233        region.set_staging().unwrap();
1234        assert_eq!(
1235            region.state(),
1236            RegionRoleState::Leader(RegionLeaderState::Staging)
1237        );
1238        assert!(region.is_staging());
1239
1240        // Test transition back to writable
1241        region.exit_staging().unwrap();
1242        assert_eq!(
1243            region.state(),
1244            RegionRoleState::Leader(RegionLeaderState::Writable)
1245        );
1246        assert!(!region.is_staging());
1247
1248        // Test invalid transitions
1249        assert!(region.set_staging().is_ok()); // Writable -> Staging should work
1250        assert!(region.set_staging().is_err()); // Staging -> Staging should fail
1251        assert!(region.exit_staging().is_ok()); // Staging -> Writable should work
1252        assert!(region.exit_staging().is_err()); // Writable -> Writable should fail
1253    }
1254}