Skip to main content

mito2/
region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region.
16
17pub mod catchup;
18pub mod opener;
19pub mod options;
20pub mod utils;
21pub(crate) mod version;
22
23use std::collections::hash_map::Entry;
24use std::collections::{HashMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27
28use common_base::hash::partition_expr_version;
29use common_telemetry::{error, info, warn};
30use crossbeam_utils::atomic::AtomicCell;
31use partition::expr::PartitionExpr;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::PrimaryKeyEncoding;
35use store_api::logstore::provider::Provider;
36use store_api::metadata::RegionMetadataRef;
37use store_api::region_engine::{
38    RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
39};
40use store_api::region_info::RegionInfoEntry;
41use store_api::region_request::{PathType, StagingPartitionDirective};
42use store_api::sst_entry::ManifestSstEntry;
43use store_api::storage::{FileId, RegionId, SequenceNumber};
44use tokio::sync::RwLockWriteGuard;
45pub use utils::*;
46
47use crate::access_layer::AccessLayerRef;
48use crate::error::{
49    FlushableRegionStateSnafu, InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu,
50    RegionTruncatedSnafu, Result, UnexpectedSnafu, UpdateManifestSnafu,
51};
52use crate::manifest::action::{
53    RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
54};
55use crate::manifest::manager::RegionManifestManager;
56use crate::region::version::{VersionControlRef, VersionRef};
57use crate::request::{OnFailure, OptionOutputTx};
58use crate::sst::file::FileMeta;
59use crate::sst::file_purger::FilePurgerRef;
60use crate::sst::location::{index_file_path, sst_file_path};
61use crate::time_provider::TimeProviderRef;
62
63/// This is the approximate factor to estimate the size of wal.
64const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
65
66/// Region status include region id, memtable usage, sst usage, wal usage and manifest usage.
67#[derive(Debug)]
68pub struct RegionUsage {
69    pub region_id: RegionId,
70    pub wal_usage: u64,
71    pub sst_usage: u64,
72    pub manifest_usage: u64,
73}
74
75impl RegionUsage {
76    pub fn disk_usage(&self) -> u64 {
77        self.wal_usage + self.sst_usage + self.manifest_usage
78    }
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum RegionLeaderState {
83    /// The region is opened and is writable.
84    Writable,
85    /// The region is in staging mode - writable but no checkpoint/compaction.
86    Staging,
87    /// The region is entering staging mode. - write requests will be stalled.
88    EnteringStaging,
89    /// The region is altering.
90    Altering,
91    /// The region is dropping.
92    Dropping,
93    /// The region is truncating.
94    Truncating,
95    /// The region is handling a region edit.
96    Editing,
97    /// The region is stepping down.
98    Downgrading,
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102pub enum RegionRoleState {
103    Leader(RegionLeaderState),
104    Follower,
105}
106
107impl RegionRoleState {
108    /// Converts the region role state to leader state if it is a leader state.
109    pub fn into_leader_state(self) -> Option<RegionLeaderState> {
110        match self {
111            RegionRoleState::Leader(leader_state) => Some(leader_state),
112            RegionRoleState::Follower => None,
113        }
114    }
115
116    pub(crate) fn as_str(&self) -> &'static str {
117        match self {
118            RegionRoleState::Follower => "Follower",
119            RegionRoleState::Leader(RegionLeaderState::Writable) => "Leader(Writable)",
120            RegionRoleState::Leader(RegionLeaderState::Staging) => "Leader(Staging)",
121            RegionRoleState::Leader(RegionLeaderState::EnteringStaging) => {
122                "Leader(EnteringStaging)"
123            }
124            RegionRoleState::Leader(RegionLeaderState::Altering) => "Leader(Altering)",
125            RegionRoleState::Leader(RegionLeaderState::Dropping) => "Leader(Dropping)",
126            RegionRoleState::Leader(RegionLeaderState::Truncating) => "Leader(Truncating)",
127            RegionRoleState::Leader(RegionLeaderState::Editing) => "Leader(Editing)",
128            RegionRoleState::Leader(RegionLeaderState::Downgrading) => "Leader(Downgrading)",
129        }
130    }
131}
132
133/// Metadata and runtime status of a region.
134///
135/// Writing and reading a region follow a single-writer-multi-reader rule:
136/// - Only the region worker thread this region belongs to can modify the metadata.
137/// - Multiple reader threads are allowed to read a specific `version` of a region.
138#[derive(Debug)]
139pub struct MitoRegion {
140    /// Id of this region.
141    ///
142    /// Accessing region id from the version control is inconvenient so
143    /// we also store it here.
144    pub(crate) region_id: RegionId,
145
146    /// Version controller for this region.
147    ///
148    /// We MUST update the version control inside the write lock of the region manifest manager.
149    pub(crate) version_control: VersionControlRef,
150    /// SSTs accessor for this region.
151    pub(crate) access_layer: AccessLayerRef,
152    /// Context to maintain manifest for this region.
153    pub(crate) manifest_ctx: ManifestContextRef,
154    /// SST file purger.
155    pub(crate) file_purger: FilePurgerRef,
156    /// The provider of log store.
157    pub(crate) provider: Provider,
158    /// Last flush time in millis.
159    last_flush_millis: AtomicI64,
160    /// Last compaction time in millis.
161    last_compaction_millis: AtomicI64,
162    /// Provider to get current time.
163    time_provider: TimeProviderRef,
164    /// The topic's latest entry id since the region's last flushing.
165    /// **Only used for remote WAL pruning.**
166    ///
167    /// The value will be updated to the latest offset of the topic
168    /// if region receives a flush request or schedules a periodic flush task
169    /// and the region's memtable is empty.
170    ///
171    /// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region,
172    /// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`.
173    pub(crate) topic_latest_entry_id: AtomicU64,
174    /// The total bytes written to the region.
175    pub(crate) written_bytes: Arc<AtomicU64>,
176    /// manifest stats
177    stats: ManifestStats,
178}
179
180pub type MitoRegionRef = Arc<MitoRegion>;
181
182#[derive(Debug, Clone)]
183pub(crate) struct StagingPartitionInfo {
184    pub(crate) partition_directive: StagingPartitionDirective,
185    pub(crate) partition_rule_version: u64,
186}
187
188impl StagingPartitionInfo {
189    /// Returns the partition expression carried by the staging directive, if any.
190    pub(crate) fn partition_expr(&self) -> Option<&str> {
191        self.partition_directive.partition_expr()
192    }
193
194    /// Builds staging partition info from a directive and derives its version marker.
195    pub(crate) fn from_partition_directive(partition_directive: StagingPartitionDirective) -> Self {
196        let partition_rule_version = match &partition_directive {
197            StagingPartitionDirective::UpdatePartitionExpr(expr) => {
198                partition_expr_version(Some(expr))
199            }
200            StagingPartitionDirective::RejectAllWrites => 0,
201        };
202        Self {
203            partition_directive,
204            partition_rule_version,
205        }
206    }
207}
208
209impl MitoRegion {
210    /// Stop background managers for this region.
211    pub(crate) async fn stop(&self) {
212        self.manifest_ctx
213            .manifest_manager
214            .write()
215            .await
216            .stop()
217            .await;
218
219        info!(
220            "Stopped region manifest manager, region_id: {}",
221            self.region_id
222        );
223    }
224
225    /// Returns current metadata of the region.
226    pub fn metadata(&self) -> RegionMetadataRef {
227        let version_data = self.version_control.current();
228        version_data.version.metadata.clone()
229    }
230
231    /// Returns primary key encoding of the region.
232    pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
233        let version_data = self.version_control.current();
234        version_data.version.metadata.primary_key_encoding
235    }
236
237    /// Returns current version of the region.
238    pub(crate) fn version(&self) -> VersionRef {
239        let version_data = self.version_control.current();
240        version_data.version
241    }
242
243    /// Returns last flush timestamp in millis.
244    pub(crate) fn last_flush_millis(&self) -> i64 {
245        self.last_flush_millis.load(Ordering::Relaxed)
246    }
247
248    /// Update flush time to current time.
249    pub(crate) fn update_flush_millis(&self) {
250        let now = self.time_provider.current_time_millis();
251        self.last_flush_millis.store(now, Ordering::Relaxed);
252    }
253
254    /// Returns last compaction timestamp in millis.
255    pub(crate) fn last_compaction_millis(&self) -> i64 {
256        self.last_compaction_millis.load(Ordering::Relaxed)
257    }
258
259    /// Update compaction time to current time.
260    pub(crate) fn update_compaction_millis(&self) {
261        let now = self.time_provider.current_time_millis();
262        self.last_compaction_millis.store(now, Ordering::Relaxed);
263    }
264
265    /// Returns the table dir.
266    pub(crate) fn table_dir(&self) -> &str {
267        self.access_layer.table_dir()
268    }
269
270    /// Returns the path type of the region.
271    pub(crate) fn path_type(&self) -> PathType {
272        self.access_layer.path_type()
273    }
274
275    /// Returns whether the region is writable.
276    pub(crate) fn is_writable(&self) -> bool {
277        matches!(
278            self.manifest_ctx.state.load(),
279            RegionRoleState::Leader(RegionLeaderState::Writable)
280                | RegionRoleState::Leader(RegionLeaderState::Staging)
281        )
282    }
283
284    /// Returns whether the region is flushable.
285    pub(crate) fn is_flushable(&self) -> bool {
286        matches!(
287            self.manifest_ctx.state.load(),
288            RegionRoleState::Leader(RegionLeaderState::Writable)
289                | RegionRoleState::Leader(RegionLeaderState::Staging)
290                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
291        )
292    }
293
294    /// Returns whether the region should abort index building.
295    pub(crate) fn should_abort_index(&self) -> bool {
296        matches!(
297            self.manifest_ctx.state.load(),
298            RegionRoleState::Follower
299                | RegionRoleState::Leader(RegionLeaderState::Dropping)
300                | RegionRoleState::Leader(RegionLeaderState::Truncating)
301                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
302                | RegionRoleState::Leader(RegionLeaderState::Staging)
303        )
304    }
305
306    /// Returns whether the region is downgrading.
307    pub(crate) fn is_downgrading(&self) -> bool {
308        matches!(
309            self.manifest_ctx.state.load(),
310            RegionRoleState::Leader(RegionLeaderState::Downgrading)
311        )
312    }
313
314    /// Returns whether the region is in staging mode.
315    pub(crate) fn is_staging(&self) -> bool {
316        self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
317    }
318
319    /// Returns whether the region is entering staging mode.
320    pub(crate) fn is_enter_staging(&self) -> bool {
321        self.manifest_ctx.state.load()
322            == RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
323    }
324
325    pub fn region_id(&self) -> RegionId {
326        self.region_id
327    }
328
329    pub fn find_committed_sequence(&self) -> SequenceNumber {
330        self.version_control.committed_sequence()
331    }
332
333    /// Returns the latest sequence that has already been persisted into SSTs.
334    ///
335    /// Incremental memtable-only reads must use a cursor greater than or equal to
336    /// this boundary; older cursors are stale because the corresponding updates may
337    /// already have been flushed out of memtables.
338    pub fn flushed_sequence(&self) -> SequenceNumber {
339        self.version_control.current().version.flushed_sequence
340    }
341
342    /// Returns whether the region is readonly.
343    pub fn is_follower(&self) -> bool {
344        self.manifest_ctx.state.load() == RegionRoleState::Follower
345    }
346
347    /// Returns the state of the region.
348    pub(crate) fn state(&self) -> RegionRoleState {
349        self.manifest_ctx.state.load()
350    }
351
352    /// Sets the region role state.
353    pub(crate) fn set_role(&self, next_role: RegionRole) {
354        self.manifest_ctx.set_role(next_role, self.region_id);
355    }
356
357    pub(crate) fn region_role(&self) -> RegionRole {
358        match self.state() {
359            RegionRoleState::Follower => RegionRole::Follower,
360            RegionRoleState::Leader(RegionLeaderState::Staging) => RegionRole::StagingLeader,
361            RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
362                RegionRole::DowngradingLeader
363            }
364            RegionRoleState::Leader(_) => RegionRole::Leader,
365        }
366    }
367
368    /// Sets the altering state.
369    /// You should call this method in the worker loop.
370    pub(crate) fn set_altering(&self) -> Result<()> {
371        self.compare_exchange_state(
372            RegionLeaderState::Writable,
373            RegionRoleState::Leader(RegionLeaderState::Altering),
374        )
375    }
376
377    /// Sets the dropping state.
378    /// You should call this method in the worker loop.
379    pub(crate) fn set_dropping(&self, expect: RegionLeaderState) -> Result<()> {
380        self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Dropping))
381    }
382
383    /// Sets the truncating state.
384    /// You should call this method in the worker loop.
385    pub(crate) fn set_truncating(&self) -> Result<()> {
386        self.compare_exchange_state(
387            RegionLeaderState::Writable,
388            RegionRoleState::Leader(RegionLeaderState::Truncating),
389        )
390    }
391
392    /// Sets the editing state.
393    /// You should call this method in the worker loop.
394    pub(crate) fn set_editing(&self, expect: RegionLeaderState) -> Result<()> {
395        self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Editing))
396    }
397
398    /// Sets the staging state.
399    ///
400    /// You should call this method in the worker loop.
401    /// Transitions from Writable to Staging state.
402    /// Cleans any existing staging manifests before entering staging mode.
403    pub(crate) async fn set_staging(
404        &self,
405        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
406    ) -> Result<()> {
407        manager.store().clear_staging_manifests().await?;
408
409        self.compare_exchange_state(
410            RegionLeaderState::Writable,
411            RegionRoleState::Leader(RegionLeaderState::Staging),
412        )
413    }
414
415    /// Sets the entering staging state.
416    pub(crate) fn set_entering_staging(&self) -> Result<()> {
417        self.compare_exchange_state(
418            RegionLeaderState::Writable,
419            RegionRoleState::Leader(RegionLeaderState::EnteringStaging),
420        )
421    }
422
423    /// Exits the staging state back to writable.
424    ///
425    /// You should call this method in the worker loop.
426    /// Transitions from Staging to Writable state.
427    pub fn exit_staging(&self) -> Result<()> {
428        self.manifest_ctx.exit_staging(
429            self.region_id,
430            RegionRoleState::Leader(RegionLeaderState::Writable),
431        )
432    }
433
434    /// Sets the region role state gracefully. This acquires the manifest write lock.
435    pub(crate) async fn set_role_state_gracefully(
436        &self,
437        state: SettableRegionRoleState,
438    ) -> Result<()> {
439        let mut manager: RwLockWriteGuard<'_, RegionManifestManager> =
440            self.manifest_ctx.manifest_manager.write().await;
441        let current_state = self.state();
442
443        match state {
444            SettableRegionRoleState::Leader => {
445                // Exit staging mode and return to normal writable leader
446                // Only allowed from staging state
447                match current_state {
448                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
449                        info!("Exiting staging mode for region {}", self.region_id);
450                        // Use the success exit path that merges all staged manifests
451                        self.exit_staging_on_success(&mut manager).await?;
452                    }
453                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
454                        // Already in desired state - no-op
455                        info!("Region {} already in normal leader mode", self.region_id);
456                    }
457                    _ => {
458                        // Only staging -> leader transition is allowed
459                        return Err(RegionStateSnafu {
460                            region_id: self.region_id,
461                            state: current_state,
462                            expect: RegionRoleState::Leader(RegionLeaderState::Staging),
463                        }
464                        .build());
465                    }
466                }
467            }
468
469            SettableRegionRoleState::StagingLeader => {
470                // Enter staging mode from normal writable leader
471                // Only allowed from writable leader state
472                match current_state {
473                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
474                        info!("Entering staging mode for region {}", self.region_id);
475                        self.set_staging(&mut manager).await?;
476                    }
477                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
478                        // Already in desired state - no-op
479                        info!("Region {} already in staging mode", self.region_id);
480                    }
481                    _ => {
482                        return Err(RegionStateSnafu {
483                            region_id: self.region_id,
484                            state: current_state,
485                            expect: RegionRoleState::Leader(RegionLeaderState::Writable),
486                        }
487                        .build());
488                    }
489                }
490            }
491
492            SettableRegionRoleState::Follower => {
493                // Make this region a follower
494                match current_state {
495                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
496                        info!(
497                            "Exiting staging and demoting region {} to follower",
498                            self.region_id
499                        );
500                        self.exit_staging()?;
501                        self.set_role(RegionRole::Follower);
502                    }
503                    RegionRoleState::Leader(_) => {
504                        info!("Demoting region {} from leader to follower", self.region_id);
505                        self.set_role(RegionRole::Follower);
506                    }
507                    RegionRoleState::Follower => {
508                        // Already in desired state - no-op
509                        info!("Region {} already in follower mode", self.region_id);
510                    }
511                }
512            }
513
514            SettableRegionRoleState::DowngradingLeader => {
515                // downgrade this region to downgrading leader
516                match current_state {
517                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
518                        info!(
519                            "Exiting staging and entering downgrade for region {}",
520                            self.region_id
521                        );
522                        self.exit_staging()?;
523                        self.set_role(RegionRole::DowngradingLeader);
524                    }
525                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
526                        info!("Starting downgrade for region {}", self.region_id);
527                        self.set_role(RegionRole::DowngradingLeader);
528                    }
529                    RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
530                        // Already in desired state - no-op
531                        info!("Region {} already in downgrading mode", self.region_id);
532                    }
533                    _ => {
534                        warn!(
535                            "Cannot start downgrade for region {} from state {:?}",
536                            self.region_id, current_state
537                        );
538                    }
539                }
540            }
541        }
542
543        // Hack(zhongzc): If we have just become leader (writable), persist any backfilled metadata.
544        if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
545            // Persist backfilled metadata if manifest is missing fields (e.g., partition_expr)
546            let manifest_meta = &manager.manifest().metadata;
547            let current_version = self.version();
548            let current_meta = &current_version.metadata;
549            if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
550                let action = RegionMetaAction::Change(RegionChange {
551                    metadata: current_meta.clone(),
552                    sst_format: current_version.options.sst_format.unwrap_or_default(),
553                    append_mode: None,
554                });
555                let result = manager
556                    .update(RegionMetaActionList::with_action(action), false)
557                    .await;
558
559                match result {
560                    Ok(version) => {
561                        info!(
562                            "Successfully persisted backfilled metadata for region {}, version: {}",
563                            self.region_id, version
564                        );
565                    }
566                    Err(e) => {
567                        warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
568                    }
569                }
570            }
571        }
572
573        drop(manager);
574
575        Ok(())
576    }
577
578    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Writable)` if the current state is `expect`.
579    /// Otherwise, logs an error.
580    pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
581        if let Err(e) = self
582            .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
583        {
584            error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
585        }
586    }
587
588    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Staging)` if the current state is `expect`.
589    /// Otherwise, logs an error.
590    pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
591        if let Err(e) =
592            self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
593        {
594            error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
595        }
596    }
597
598    /// Returns the region statistic.
599    pub(crate) fn region_statistic(&self) -> RegionStatistic {
600        let version = self.version();
601        let memtables = &version.memtables;
602        let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
603
604        let sst_usage = version.ssts.owned_sst_usage(self.region_id);
605        let index_usage = version.ssts.owned_index_usage(self.region_id);
606        let flushed_entry_id = version.flushed_entry_id;
607
608        let wal_usage = self.estimated_wal_usage(memtable_usage);
609        let manifest_usage = self.stats.total_manifest_size();
610        let num_rows = version.ssts.owned_num_rows(self.region_id) + version.memtables.num_rows();
611        let num_files = version.ssts.owned_num_files(self.region_id);
612        let manifest_version = self.stats.manifest_version();
613        let file_removed_cnt = self.stats.file_removed_cnt();
614
615        let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
616        let written_bytes = self.written_bytes.load(Ordering::Relaxed);
617
618        RegionStatistic {
619            num_rows,
620            memtable_size: memtable_usage,
621            wal_size: wal_usage,
622            manifest_size: manifest_usage,
623            sst_size: sst_usage,
624            sst_num: num_files,
625            index_size: index_usage,
626            manifest: RegionManifestInfo::Mito {
627                manifest_version,
628                flushed_entry_id,
629                file_removed_cnt,
630            },
631            data_topic_latest_entry_id: topic_latest_entry_id,
632            metadata_topic_latest_entry_id: topic_latest_entry_id,
633            written_bytes,
634        }
635    }
636
637    /// Estimated WAL size in bytes.
638    /// Use the memtables size to estimate the size of wal.
639    fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
640        ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
641    }
642
643    /// Sets the state of the region to given state if the current state equals to
644    /// the expected.
645    fn compare_exchange_state(
646        &self,
647        expect: RegionLeaderState,
648        state: RegionRoleState,
649    ) -> Result<()> {
650        self.manifest_ctx
651            .state
652            .compare_exchange(RegionRoleState::Leader(expect), state)
653            .map_err(|actual| {
654                RegionStateSnafu {
655                    region_id: self.region_id,
656                    state: actual,
657                    expect: RegionRoleState::Leader(expect),
658                }
659                .build()
660            })?;
661        Ok(())
662    }
663
664    pub fn access_layer(&self) -> AccessLayerRef {
665        self.access_layer.clone()
666    }
667
668    /// Returns the region info entry of the region.
669    pub(crate) fn region_info_entry(&self, node_id: Option<u64>) -> RegionInfoEntry {
670        let region_id = self.region_id;
671        let version = self.version();
672        let state = self.state();
673        let role = self.region_role();
674        let region_options = serde_json::to_string(&version.options)
675            .unwrap_or_else(|err| serde_json::json!({ "error": err.to_string() }).to_string());
676        let sst_format = match version.options.sst_format.unwrap_or_default() {
677            crate::sst::FormatType::PrimaryKey => "primary_key",
678            crate::sst::FormatType::Flat => "flat",
679        }
680        .to_string();
681
682        RegionInfoEntry {
683            region_id,
684            table_id: region_id.table_id(),
685            region_number: region_id.region_number(),
686            region_group: region_id.region_group(),
687            region_sequence: region_id.region_sequence(),
688            state: state.as_str().to_string(),
689            role: role.to_string(),
690            writable: self.is_writable(),
691            committed_sequence: self.find_committed_sequence(),
692            flushed_sequence: Some(self.flushed_sequence()).filter(|sequence| *sequence > 0),
693            manifest_version: self.stats.manifest_version(),
694            compaction_time_window: version
695                .compaction_time_window
696                .map(|duration| humantime::format_duration(duration).to_string()),
697            region_options,
698            sst_format,
699            node_id,
700        }
701    }
702
703    /// Returns the SST entries of the region.
704    pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
705        let table_dir = self.table_dir();
706        let path_type = self.access_layer.path_type();
707
708        let visible_ssts = self
709            .version()
710            .ssts
711            .levels()
712            .iter()
713            .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
714            .collect::<HashSet<_>>();
715
716        let manifest_files = self.manifest_ctx.manifest().await.files.clone();
717        let staging_files = self
718            .manifest_ctx
719            .staging_manifest()
720            .await
721            .map(|m| m.files.clone())
722            .unwrap_or_default();
723        let files = manifest_files
724            .into_iter()
725            .chain(staging_files)
726            .collect::<HashMap<_, _>>();
727
728        files
729            .values()
730            .map(|meta| {
731                let region_id = self.region_id;
732                let origin_region_id = meta.region_id;
733                let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
734                {
735                    let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
736                    (
737                        meta.index_version,
738                        Some(index_file_path),
739                        Some(meta.index_file_size),
740                    )
741                } else {
742                    (0, None, None)
743                };
744                let visible = visible_ssts.contains(&meta.file_id);
745                ManifestSstEntry {
746                    table_dir: table_dir.to_string(),
747                    region_id,
748                    table_id: region_id.table_id(),
749                    region_number: region_id.region_number(),
750                    region_group: region_id.region_group(),
751                    region_sequence: region_id.region_sequence(),
752                    file_id: meta.file_id.to_string(),
753                    index_version,
754                    level: meta.level,
755                    file_path: sst_file_path(table_dir, meta.file_id(), path_type),
756                    file_size: meta.file_size,
757                    index_file_path,
758                    index_file_size,
759                    num_rows: meta.num_rows,
760                    num_row_groups: meta.num_row_groups,
761                    num_series: Some(meta.num_series),
762                    min_ts: meta.time_range.0,
763                    max_ts: meta.time_range.1,
764                    sequence: meta.sequence.map(|s| s.get()),
765                    origin_region_id,
766                    node_id: None,
767                    visible,
768                    primary_key_min: meta.primary_key_min.clone(),
769                    primary_key_max: meta.primary_key_max.clone(),
770                }
771            })
772            .collect()
773    }
774
775    /// Returns the file metas of the region by file ids.
776    pub async fn file_metas(&self, file_ids: &[FileId]) -> Vec<Option<FileMeta>> {
777        let manifest_files = self.manifest_ctx.manifest().await.files.clone();
778
779        file_ids
780            .iter()
781            .map(|file_id| manifest_files.get(file_id).cloned())
782            .collect::<Vec<_>>()
783    }
784
785    /// Exit staging mode successfully by merging all staged manifests and making them visible.
786    pub(crate) async fn exit_staging_on_success(
787        &self,
788        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
789    ) -> Result<()> {
790        let current_state = self.manifest_ctx.current_state();
791        ensure!(
792            current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
793            RegionStateSnafu {
794                region_id: self.region_id,
795                state: current_state,
796                expect: RegionRoleState::Leader(RegionLeaderState::Staging),
797            }
798        );
799
800        // Merge all staged manifest actions
801        let merged_actions = match manager.merge_staged_actions(current_state).await? {
802            Some(actions) => actions,
803            None => {
804                info!(
805                    "No staged manifests to merge for region {}, exiting staging mode without changes",
806                    self.region_id
807                );
808                // Even if no manifests to merge, we still need to exit staging mode
809                self.exit_staging()?;
810                return Ok(());
811            }
812        };
813        let expect_change = merged_actions.actions.iter().any(|a| a.is_change());
814        let expect_partition_expr_change = merged_actions
815            .actions
816            .iter()
817            .any(|a| a.is_partition_expr_change());
818        let expect_edit = merged_actions.actions.iter().any(|a| a.is_edit());
819        ensure!(
820            !(expect_change && expect_partition_expr_change),
821            UnexpectedSnafu {
822                reason: "unexpected both change and partition expr change actions in merged actions"
823            }
824        );
825        ensure!(
826            expect_change || expect_partition_expr_change,
827            UnexpectedSnafu {
828                reason: "expect a change or partition expr change action in merged actions"
829            }
830        );
831        ensure!(
832            expect_edit,
833            UnexpectedSnafu {
834                reason: "expect an edit action in merged actions"
835            }
836        );
837
838        let (merged_partition_expr_change, merged_change, merged_edit) =
839            merged_actions.clone().split_region_change_and_edit();
840        if let Some(change) = &merged_change {
841            // In staging exit we only allow metadata-only updates. A `Change`
842            // action is accepted only when column definitions are unchanged;
843            // otherwise it is treated as a schema change and rejected.
844            let current_column_metadatas = &self.version().metadata.column_metadatas;
845            ensure!(
846                change.metadata.column_metadatas == *current_column_metadatas,
847                UnexpectedSnafu {
848                    reason: "change action alters column metadata in staging exit"
849                }
850            );
851        }
852
853        // Submit merged actions using the manifest manager's update method
854        // Pass the `false` so it saves to normal directory, not staging
855        let new_version = manager.update(merged_actions, false).await?;
856        info!(
857            "Successfully submitted merged staged manifests for region {}, new version: {}",
858            self.region_id, new_version
859        );
860
861        // Apply the merged changes to in-memory version control
862        if let Some(change) = merged_partition_expr_change {
863            let mut new_metadata = self.version().metadata.as_ref().clone();
864            new_metadata.set_partition_expr(change.partition_expr);
865            self.version_control.alter_metadata(new_metadata.into());
866        }
867        if let Some(change) = merged_change {
868            self.version_control.alter_metadata(change.metadata);
869        }
870        self.version_control
871            .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
872
873        // Clear all staging manifests and transit state
874        if let Err(e) = manager.clear_staging_manifest_and_dir().await {
875            error!(e; "Failed to clear staging manifest dir for region {}", self.region_id);
876        }
877        self.exit_staging()?;
878
879        Ok(())
880    }
881
882    /// Returns the partition expression string for this region.
883    ///
884    /// If the region is currently in staging state, this returns the partition expression held in
885    /// the staging partition field. Otherwise, it returns the partition expression from the primary
886    /// region metadata (current committed version).
887    pub fn maybe_staging_partition_expr_str(&self) -> Option<String> {
888        let is_staging = self.is_staging();
889        if is_staging {
890            let staging_partition_info = self.manifest_ctx.staging_partition_info();
891            if staging_partition_info.is_none() {
892                warn!(
893                    "Staging partition expr is none for region {} in staging state",
894                    self.region_id
895                );
896            }
897            staging_partition_info
898                .as_ref()
899                .and_then(|info| info.partition_expr().map(ToString::to_string))
900        } else {
901            let version = self.version();
902            version.metadata.partition_expr.clone()
903        }
904    }
905
906    pub fn expected_partition_expr_version(&self) -> u64 {
907        if self.is_staging() {
908            self.manifest_ctx
909                .staging_partition_info()
910                .as_ref()
911                .map(|info| info.partition_rule_version)
912                .unwrap_or_default()
913        } else {
914            self.version().metadata.partition_expr_version
915        }
916    }
917
918    /// Returns whether writes should be rejected for this region in staging mode.
919    pub(crate) fn reject_all_writes_in_staging(&self) -> bool {
920        if !self.is_staging() {
921            return false;
922        }
923        self.manifest_ctx
924            .staging_partition_info()
925            .as_ref()
926            .map(|info| {
927                matches!(
928                    info.partition_directive,
929                    StagingPartitionDirective::RejectAllWrites
930                )
931            })
932            .unwrap_or(false)
933    }
934}
935
936/// Context to update the region manifest.
937#[derive(Debug)]
938pub(crate) struct ManifestContext {
939    /// Manager to maintain manifest for this region.
940    pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
941    /// The state of the region. The region checks the state before updating
942    /// manifest.
943    state: AtomicCell<RegionRoleState>,
944    /// Partition info of the region in staging mode.
945    ///
946    /// During the staging mode, the region metadata in [`VersionControlRef`] is not updated,
947    /// so we need to store the partition info separately.
948    staging_partition_info: Mutex<Option<StagingPartitionInfo>>,
949}
950
951impl ManifestContext {
952    pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
953        ManifestContext {
954            manifest_manager: tokio::sync::RwLock::new(manager),
955            state: AtomicCell::new(state),
956            staging_partition_info: Mutex::new(None),
957        }
958    }
959
960    pub(crate) fn staging_partition_info(&self) -> Option<StagingPartitionInfo> {
961        self.staging_partition_info.lock().unwrap().clone()
962    }
963
964    pub(crate) fn set_staging_partition_info(&self, staging_partition_info: StagingPartitionInfo) {
965        let mut current = self.staging_partition_info.lock().unwrap();
966        debug_assert!(current.is_none());
967        *current = Some(staging_partition_info);
968    }
969
970    fn clear_staging_partition_info(&self) {
971        *self.staging_partition_info.lock().unwrap() = None;
972    }
973
974    pub(crate) fn exit_staging(
975        &self,
976        region_id: RegionId,
977        next_state: RegionRoleState,
978    ) -> Result<()> {
979        self.state
980            .compare_exchange(
981                RegionRoleState::Leader(RegionLeaderState::Staging),
982                next_state,
983            )
984            .map_err(|actual| {
985                RegionStateSnafu {
986                    region_id,
987                    state: actual,
988                    expect: RegionRoleState::Leader(RegionLeaderState::Staging),
989                }
990                .build()
991            })?;
992        self.clear_staging_partition_info();
993        Ok(())
994    }
995
996    pub(crate) async fn manifest_version(&self) -> ManifestVersion {
997        self.manifest_manager
998            .read()
999            .await
1000            .manifest()
1001            .manifest_version
1002    }
1003
1004    pub(crate) async fn has_update(&self) -> Result<bool> {
1005        self.manifest_manager.read().await.has_update().await
1006    }
1007
1008    /// Returns the current region role state.
1009    pub(crate) fn current_state(&self) -> RegionRoleState {
1010        self.state.load()
1011    }
1012
1013    /// Installs the manifest changes from the current version to the target version (inclusive).
1014    ///
1015    /// Returns installed [RegionManifest].
1016    /// **Note**: This function is not guaranteed to install the target version strictly.
1017    /// The installed version may be greater than the target version.
1018    pub(crate) async fn install_manifest_to(
1019        &self,
1020        version: ManifestVersion,
1021    ) -> Result<Arc<RegionManifest>> {
1022        let mut manager = self.manifest_manager.write().await;
1023        manager.install_manifest_to(version).await?;
1024
1025        Ok(manager.manifest())
1026    }
1027
1028    /// Updates the manifest if current state is `expect_state`.
1029    pub(crate) async fn update_manifest(
1030        &self,
1031        expect_state: RegionLeaderState,
1032        action_list: RegionMetaActionList,
1033        is_staging: bool,
1034    ) -> Result<ManifestVersion> {
1035        self.update_manifest_with_state_check(action_list, is_staging, |current_state, region_id| {
1036            // If expect_state is not downgrading, the current state must be either `expect_state` or downgrading.
1037            //
1038            // A downgrading leader rejects user writes but still allows
1039            // flushing the memtable and updating the manifest.
1040            if expect_state != RegionLeaderState::Downgrading {
1041                if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
1042                    info!(
1043                        "Region {} is in downgrading leader state, updating manifest. Expect state is {:?}",
1044                        region_id, expect_state
1045                    );
1046                }
1047                ensure!(
1048                    current_state == RegionRoleState::Leader(expect_state)
1049                        || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
1050                    UpdateManifestSnafu {
1051                        region_id,
1052                        state: current_state,
1053                    }
1054                );
1055            } else {
1056                ensure!(
1057                    current_state == RegionRoleState::Leader(expect_state),
1058                    RegionStateSnafu {
1059                        region_id,
1060                        state: current_state,
1061                        expect: RegionRoleState::Leader(expect_state),
1062                    }
1063                );
1064            }
1065
1066            Ok(())
1067        })
1068        .await
1069    }
1070
1071    /// Updates the manifest for compaction.
1072    ///
1073    /// Compaction may finish while a direct external region edit is in the transient
1074    /// `Editing` state. Direct external edits can remove files both when followers
1075    /// apply sync-region metadata and when a writable leader performs a direct edit
1076    /// such as `edit_region()`. Allowing compaction to publish in `Editing` is still
1077    /// safe because publication happens under the manifest write lock and compaction
1078    /// rechecks that its input files are still valid before committing.
1079    ///
1080    /// This intentionally writes to the normal manifest path (`is_staging = false`).
1081    /// Entering staging cancels or waits for active compactions before switching the
1082    /// region to `Staging`, so a compaction that started before staging still finishes
1083    /// against the normal manifest. Even if a manual compaction is requested while the
1084    /// region is already staging, compaction only sees SSTs in the normal visible
1085    /// region version; SSTs from staging manifests are not applied to region version
1086    /// control until staging exits successfully.
1087    pub(crate) async fn update_manifest_for_compaction(
1088        &self,
1089        action_list: RegionMetaActionList,
1090    ) -> Result<ManifestVersion> {
1091        self.update_manifest_with_state_check(action_list, false, |current_state, region_id| {
1092            ensure!(
1093                matches!(
1094                    current_state,
1095                    RegionRoleState::Leader(RegionLeaderState::Writable)
1096                        | RegionRoleState::Leader(RegionLeaderState::Editing)
1097                        | RegionRoleState::Leader(RegionLeaderState::Downgrading)
1098                ),
1099                UpdateManifestSnafu {
1100                    region_id,
1101                    state: current_state,
1102                }
1103            );
1104
1105            Ok(())
1106        })
1107        .await
1108    }
1109
1110    async fn update_manifest_with_state_check(
1111        &self,
1112        action_list: RegionMetaActionList,
1113        is_staging: bool,
1114        check_state: impl FnOnce(RegionRoleState, RegionId) -> Result<()>,
1115    ) -> Result<ManifestVersion> {
1116        // Acquires the write lock of the manifest manager.
1117        let mut manager = self.manifest_manager.write().await;
1118        // Gets current manifest.
1119        let manifest = manager.manifest();
1120        // Checks state inside the lock. This is to ensure that we won't update the manifest
1121        // after `set_readonly_gracefully()` is called.
1122        let current_state = self.state.load();
1123        check_state(current_state, manifest.metadata.region_id)?;
1124
1125        for action in &action_list.actions {
1126            // Checks whether the edit is still applicable.
1127            let RegionMetaAction::Edit(edit) = &action else {
1128                continue;
1129            };
1130
1131            // Checks whether the region is truncated.
1132            let Some(truncated_entry_id) = manifest.truncated_entry_id else {
1133                continue;
1134            };
1135
1136            // This is an edit from flush.
1137            if let Some(flushed_entry_id) = edit.flushed_entry_id {
1138                // A flush edit is valid after truncate in two cases:
1139                // 1. `flushed_entry_id` moves past `truncated_entry_id`, meaning it definitely
1140                //    flushed data newer than the truncate point.
1141                // 2. `flushed_entry_id` equals `truncated_entry_id`, but `flushed_sequence`
1142                //    increases. This happens in skip-WAL tables where entry id can stay at 0,
1143                //    while sequence still advances for post-truncate writes.
1144                //
1145                // We still reject stale flushes from before truncate:
1146                // if entry id is equal and sequence does not advance, the flush is outdated.
1147                let is_newer_entry = truncated_entry_id < flushed_entry_id;
1148                let is_same_entry_with_newer_sequence = truncated_entry_id == flushed_entry_id
1149                    && edit.flushed_sequence.is_some_and(|flushed_sequence| {
1150                        manifest.flushed_sequence < flushed_sequence
1151                    });
1152
1153                ensure!(
1154                    is_newer_entry || is_same_entry_with_newer_sequence,
1155                    RegionTruncatedSnafu {
1156                        region_id: manifest.metadata.region_id,
1157                    }
1158                );
1159            }
1160
1161            // This is an edit from compaction.
1162            if !edit.files_to_remove.is_empty() {
1163                // Input files of the compaction task has been truncated.
1164                for file in &edit.files_to_remove {
1165                    ensure!(
1166                        manifest.files.contains_key(&file.file_id),
1167                        RegionTruncatedSnafu {
1168                            region_id: manifest.metadata.region_id,
1169                        }
1170                    );
1171                }
1172            }
1173        }
1174
1175        // Now we can update the manifest.
1176        let version = manager.update(action_list, is_staging).await.inspect_err(
1177            |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
1178        )?;
1179
1180        if self.state.load() == RegionRoleState::Follower {
1181            warn!(
1182                "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
1183                manifest.metadata.region_id
1184            );
1185        }
1186
1187        Ok(version)
1188    }
1189
1190    /// Sets the [`RegionRole`].
1191    ///
1192    /// ```text
1193    ///                  +---------------------+
1194    ///                  |   Staging Leader    |
1195    ///                  +----------+----------+
1196    ///                             |
1197    ///                             v
1198    ///     +----------+     +------+-------+     +-------------+
1199    ///     | Follower | <-> |    Leader    | <-> | Downgrading |
1200    ///     +-----+----+     +------+-------+     +------+------+
1201    ///           ^                 ^                    |
1202    ///           +-----------------+--------------------+
1203    ///
1204    /// ```
1205    ///
1206    /// # State Transitions
1207    ///
1208    /// From `Follower`:
1209    /// - `Follower -> Leader`
1210    ///
1211    /// From `Leader`:
1212    /// - `Leader -> Follower`
1213    /// - `Leader -> Downgrading Leader`
1214    ///
1215    /// From `Staging Leader`:
1216    /// - `Staging Leader -> Leader`
1217    /// - `Staging Leader -> Follower`
1218    /// - `Staging Leader -> Downgrading Leader`
1219    ///
1220    /// From `Downgrading Leader`:
1221    /// - `Downgrading Leader -> Leader`
1222    /// - `Downgrading Leader -> Follower`
1223    pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
1224        match next_role {
1225            RegionRole::Follower => {
1226                if self
1227                    .exit_staging(region_id, RegionRoleState::Follower)
1228                    .is_ok()
1229                {
1230                    info!(
1231                        "Convert region {} to follower, previous role state: {:?}",
1232                        region_id,
1233                        RegionRoleState::Leader(RegionLeaderState::Staging)
1234                    );
1235                    return;
1236                }
1237                match self.state.fetch_update(|state| {
1238                    if !matches!(state, RegionRoleState::Follower) {
1239                        Some(RegionRoleState::Follower)
1240                    } else {
1241                        None
1242                    }
1243                }) {
1244                    Ok(state) => info!(
1245                        "Convert region {} to follower, previous role state: {:?}",
1246                        region_id, state
1247                    ),
1248                    Err(state) => {
1249                        if state != RegionRoleState::Follower {
1250                            warn!(
1251                                "Failed to convert region {} to follower, current role state: {:?}",
1252                                region_id, state
1253                            )
1254                        }
1255                    }
1256                }
1257            }
1258            RegionRole::Leader => {
1259                if self
1260                    .exit_staging(
1261                        region_id,
1262                        RegionRoleState::Leader(RegionLeaderState::Writable),
1263                    )
1264                    .is_ok()
1265                {
1266                    info!(
1267                        "Convert region {} to leader, previous role state: {:?}",
1268                        region_id,
1269                        RegionRoleState::Leader(RegionLeaderState::Staging)
1270                    );
1271                    return;
1272                }
1273                match self.state.fetch_update(|state| {
1274                    if matches!(
1275                        state,
1276                        RegionRoleState::Follower
1277                            | RegionRoleState::Leader(RegionLeaderState::Downgrading)
1278                    ) {
1279                        Some(RegionRoleState::Leader(RegionLeaderState::Writable))
1280                    } else {
1281                        None
1282                    }
1283                }) {
1284                    Ok(state) => info!(
1285                        "Convert region {} to leader, previous role state: {:?}",
1286                        region_id, state
1287                    ),
1288                    Err(state) => {
1289                        if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
1290                            warn!(
1291                                "Failed to convert region {} to leader, current role state: {:?}",
1292                                region_id, state
1293                            )
1294                        }
1295                    }
1296                }
1297            }
1298            RegionRole::StagingLeader => {
1299                info!(
1300                    "Ignore direct conversion of region {} to staging leader; staging requires the dedicated workflow",
1301                    region_id
1302                );
1303            }
1304            RegionRole::DowngradingLeader => {
1305                if self
1306                    .exit_staging(
1307                        region_id,
1308                        RegionRoleState::Leader(RegionLeaderState::Downgrading),
1309                    )
1310                    .is_ok()
1311                {
1312                    info!(
1313                        "Convert region {} to downgrading region, previous role state: {:?}",
1314                        region_id,
1315                        RegionRoleState::Leader(RegionLeaderState::Staging)
1316                    );
1317                    return;
1318                }
1319                match self.state.compare_exchange(
1320                    RegionRoleState::Leader(RegionLeaderState::Writable),
1321                    RegionRoleState::Leader(RegionLeaderState::Downgrading),
1322                ) {
1323                    Ok(state) => info!(
1324                        "Convert region {} to downgrading region, previous role state: {:?}",
1325                        region_id, state
1326                    ),
1327                    Err(state) => {
1328                        if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
1329                            warn!(
1330                                "Failed to convert region {} to downgrading leader, current role state: {:?}",
1331                                region_id, state
1332                            )
1333                        }
1334                    }
1335                }
1336            }
1337        }
1338    }
1339
1340    /// Returns the normal manifest of the region.
1341    pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
1342        self.manifest_manager.read().await.manifest()
1343    }
1344
1345    /// Returns the staging manifest of the region.
1346    pub(crate) async fn staging_manifest(
1347        &self,
1348    ) -> Option<Arc<crate::manifest::action::RegionManifest>> {
1349        self.manifest_manager.read().await.staging_manifest()
1350    }
1351}
1352
1353pub(crate) type ManifestContextRef = Arc<ManifestContext>;
1354
1355/// Regions indexed by ids.
1356#[derive(Debug, Default)]
1357pub(crate) struct RegionMap {
1358    regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
1359}
1360
1361impl RegionMap {
1362    /// Returns true if the region exists.
1363    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1364        let regions = self.regions.read().unwrap();
1365        regions.contains_key(&region_id)
1366    }
1367
1368    /// Inserts a new region into the map.
1369    pub(crate) fn insert_region(&self, region: MitoRegionRef) {
1370        let mut regions = self.regions.write().unwrap();
1371        regions.insert(region.region_id, region);
1372    }
1373
1374    /// Gets region by region id.
1375    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1376        let regions = self.regions.read().unwrap();
1377        regions.get(&region_id).cloned()
1378    }
1379
1380    /// Gets writable region by region id.
1381    ///
1382    /// Returns error if the region does not exist or is readonly.
1383    pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1384        let region = self
1385            .get_region(region_id)
1386            .context(RegionNotFoundSnafu { region_id })?;
1387        ensure!(
1388            region.is_writable(),
1389            RegionStateSnafu {
1390                region_id,
1391                state: region.state(),
1392                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1393            }
1394        );
1395        Ok(region)
1396    }
1397
1398    /// Gets readonly region by region id.
1399    ///
1400    /// Returns error if the region does not exist or is writable.
1401    pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1402        let region = self
1403            .get_region(region_id)
1404            .context(RegionNotFoundSnafu { region_id })?;
1405        ensure!(
1406            region.is_follower(),
1407            RegionStateSnafu {
1408                region_id,
1409                state: region.state(),
1410                expect: RegionRoleState::Follower,
1411            }
1412        );
1413
1414        Ok(region)
1415    }
1416
1417    /// Gets region by region id.
1418    ///
1419    /// Calls the callback if the region does not exist.
1420    pub(crate) fn get_region_or<F: OnFailure>(
1421        &self,
1422        region_id: RegionId,
1423        cb: &mut F,
1424    ) -> Option<MitoRegionRef> {
1425        match self
1426            .get_region(region_id)
1427            .context(RegionNotFoundSnafu { region_id })
1428        {
1429            Ok(region) => Some(region),
1430            Err(e) => {
1431                cb.on_failure(e);
1432                None
1433            }
1434        }
1435    }
1436
1437    /// Gets writable region by region id.
1438    ///
1439    /// Calls the callback if the region does not exist or is readonly.
1440    pub(crate) fn writable_region_or<F: OnFailure>(
1441        &self,
1442        region_id: RegionId,
1443        cb: &mut F,
1444    ) -> Option<MitoRegionRef> {
1445        match self.writable_region(region_id) {
1446            Ok(region) => Some(region),
1447            Err(e) => {
1448                cb.on_failure(e);
1449                None
1450            }
1451        }
1452    }
1453
1454    /// Gets writable non-staging region by region id.
1455    ///
1456    /// Returns error if the region does not exist, is readonly, or is in staging mode.
1457    pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1458        let region = self.writable_region(region_id)?;
1459        if region.is_staging() {
1460            return Err(crate::error::RegionStateSnafu {
1461                region_id,
1462                state: region.state(),
1463                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1464            }
1465            .build());
1466        }
1467        Ok(region)
1468    }
1469
1470    /// Gets staging region by region id.
1471    ///
1472    /// Returns error if the region does not exist or is not in staging state.
1473    pub(crate) fn staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1474        let region = self
1475            .get_region(region_id)
1476            .context(RegionNotFoundSnafu { region_id })?;
1477        ensure!(
1478            region.is_staging(),
1479            RegionStateSnafu {
1480                region_id,
1481                state: region.state(),
1482                expect: RegionRoleState::Leader(RegionLeaderState::Staging),
1483            }
1484        );
1485        Ok(region)
1486    }
1487
1488    /// Gets flushable region by region id.
1489    ///
1490    /// Returns error if the region does not exist or not flushable.
1491    pub(crate) fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1492        let region = self
1493            .get_region(region_id)
1494            .context(RegionNotFoundSnafu { region_id })?;
1495        ensure!(
1496            region.is_flushable(),
1497            FlushableRegionStateSnafu {
1498                region_id,
1499                state: region.state(),
1500            }
1501        );
1502        Ok(region)
1503    }
1504
1505    /// Remove region by id.
1506    pub(crate) fn remove_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1507        let mut regions = self.regions.write().unwrap();
1508        regions.remove(&region_id)
1509    }
1510
1511    /// List all regions.
1512    pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1513        let regions = self.regions.read().unwrap();
1514        regions.values().cloned().collect()
1515    }
1516
1517    /// Clear the map.
1518    pub(crate) fn clear(&self) {
1519        self.regions.write().unwrap().clear();
1520    }
1521}
1522
1523pub(crate) type RegionMapRef = Arc<RegionMap>;
1524
1525/// Opening regions
1526#[derive(Debug, Default)]
1527pub(crate) struct OpeningRegions {
1528    regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1529}
1530
1531impl OpeningRegions {
1532    /// Registers `sender` for an opening region; Otherwise, it returns `None`.
1533    pub(crate) fn wait_for_opening_region(
1534        &self,
1535        region_id: RegionId,
1536        sender: OptionOutputTx,
1537    ) -> Option<OptionOutputTx> {
1538        let mut regions = self.regions.write().unwrap();
1539        match regions.entry(region_id) {
1540            Entry::Occupied(mut senders) => {
1541                senders.get_mut().push(sender);
1542                None
1543            }
1544            Entry::Vacant(_) => Some(sender),
1545        }
1546    }
1547
1548    /// Returns true if the region exists.
1549    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1550        let regions = self.regions.read().unwrap();
1551        regions.contains_key(&region_id)
1552    }
1553
1554    /// Inserts a new region into the map.
1555    pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1556        let mut regions = self.regions.write().unwrap();
1557        regions.insert(region, vec![sender]);
1558    }
1559
1560    /// Remove region by id.
1561    pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1562        let mut regions = self.regions.write().unwrap();
1563        regions.remove(&region_id).unwrap_or_default()
1564    }
1565
1566    #[cfg(test)]
1567    pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1568        let regions = self.regions.read().unwrap();
1569        if let Some(senders) = regions.get(&region_id) {
1570            senders.len()
1571        } else {
1572            0
1573        }
1574    }
1575}
1576
1577pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1578
1579/// The regions that are catching up.
1580#[derive(Debug, Default)]
1581pub(crate) struct CatchupRegions {
1582    regions: RwLock<HashSet<RegionId>>,
1583}
1584
1585impl CatchupRegions {
1586    /// Returns true if the region exists.
1587    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1588        let regions = self.regions.read().unwrap();
1589        regions.contains(&region_id)
1590    }
1591
1592    /// Inserts a new region into the set.
1593    pub(crate) fn insert_region(&self, region_id: RegionId) {
1594        let mut regions = self.regions.write().unwrap();
1595        regions.insert(region_id);
1596    }
1597
1598    /// Remove region by id.
1599    pub(crate) fn remove_region(&self, region_id: RegionId) {
1600        let mut regions = self.regions.write().unwrap();
1601        regions.remove(&region_id);
1602    }
1603}
1604
1605pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1606
1607/// Manifest stats.
1608#[derive(Default, Debug, Clone)]
1609pub struct ManifestStats {
1610    pub(crate) total_manifest_size: Arc<AtomicU64>,
1611    pub(crate) manifest_version: Arc<AtomicU64>,
1612    pub(crate) file_removed_cnt: Arc<AtomicU64>,
1613}
1614
1615impl ManifestStats {
1616    fn total_manifest_size(&self) -> u64 {
1617        self.total_manifest_size.load(Ordering::Relaxed)
1618    }
1619
1620    fn manifest_version(&self) -> u64 {
1621        self.manifest_version.load(Ordering::Relaxed)
1622    }
1623
1624    fn file_removed_cnt(&self) -> u64 {
1625        self.file_removed_cnt.load(Ordering::Relaxed)
1626    }
1627}
1628
1629/// Parses the partition expression from a JSON string.
1630pub fn parse_partition_expr(partition_expr_str: Option<&str>) -> Result<Option<PartitionExpr>> {
1631    match partition_expr_str {
1632        None => Ok(None),
1633        Some("") => Ok(None),
1634        Some(json_str) => {
1635            let expr = partition::expr::PartitionExpr::from_json_str(json_str)
1636                .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?;
1637            Ok(expr)
1638        }
1639    }
1640}
1641
1642#[cfg(test)]
1643mod tests {
1644    use std::sync::Arc;
1645    use std::sync::atomic::AtomicU64;
1646
1647    use common_datasource::compression::CompressionType;
1648    use common_test_util::temp_dir::create_temp_dir;
1649    use crossbeam_utils::atomic::AtomicCell;
1650    use object_store::ObjectStore;
1651    use object_store::services::Fs;
1652    use store_api::logstore::provider::Provider;
1653    use store_api::region_engine::RegionRole;
1654    use store_api::region_request::PathType;
1655    use store_api::storage::{FileId, RegionId};
1656
1657    use crate::access_layer::AccessLayer;
1658    use crate::error::Error;
1659    use crate::manifest::action::{
1660        RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionPartitionExprChange,
1661    };
1662    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1663    use crate::region::{
1664        ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1665    };
1666    use crate::sst::FormatType;
1667    use crate::sst::index::intermediate::IntermediateManager;
1668    use crate::sst::index::puffin_manager::PuffinManagerFactory;
1669    use crate::test_util::scheduler_util::SchedulerEnv;
1670    use crate::test_util::version_util::VersionControlBuilder;
1671    use crate::time_provider::StdTimeProvider;
1672
1673    #[test]
1674    fn test_region_state_lock_free() {
1675        assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1676    }
1677
1678    #[test]
1679    fn test_region_role_state_as_str() {
1680        assert_eq!("Follower", RegionRoleState::Follower.as_str());
1681        assert_eq!(
1682            "Leader(Writable)",
1683            RegionRoleState::Leader(RegionLeaderState::Writable).as_str()
1684        );
1685        assert_eq!(
1686            "Leader(Staging)",
1687            RegionRoleState::Leader(RegionLeaderState::Staging).as_str()
1688        );
1689        assert_eq!(
1690            "Leader(Downgrading)",
1691            RegionRoleState::Leader(RegionLeaderState::Downgrading).as_str()
1692        );
1693    }
1694
1695    async fn build_test_region(env: &SchedulerEnv) -> MitoRegion {
1696        let builder = VersionControlBuilder::new();
1697        let version_control = Arc::new(builder.build());
1698        let metadata = version_control.current().version.metadata.clone();
1699
1700        let manager = RegionManifestManager::new(
1701            metadata.clone(),
1702            0,
1703            RegionManifestOptions {
1704                manifest_dir: "".to_string(),
1705                object_store: env.access_layer.object_store().clone(),
1706                compress_type: CompressionType::Uncompressed,
1707                checkpoint_distance: 10,
1708                remove_file_options: Default::default(),
1709                manifest_cache: None,
1710            },
1711            FormatType::PrimaryKey,
1712            &Default::default(),
1713        )
1714        .await
1715        .unwrap();
1716
1717        let manifest_ctx = Arc::new(ManifestContext::new(
1718            manager,
1719            RegionRoleState::Leader(RegionLeaderState::Writable),
1720        ));
1721
1722        MitoRegion {
1723            region_id: metadata.region_id,
1724            version_control,
1725            access_layer: env.access_layer.clone(),
1726            manifest_ctx,
1727            file_purger: crate::test_util::new_noop_file_purger(),
1728            provider: Provider::noop_provider(),
1729            last_flush_millis: Default::default(),
1730            last_compaction_millis: Default::default(),
1731            time_provider: Arc::new(StdTimeProvider),
1732            topic_latest_entry_id: Default::default(),
1733            written_bytes: Arc::new(AtomicU64::new(0)),
1734            stats: ManifestStats::default(),
1735        }
1736    }
1737
1738    fn empty_edit() -> RegionEdit {
1739        RegionEdit {
1740            files_to_add: Vec::new(),
1741            files_to_remove: Vec::new(),
1742            timestamp_ms: None,
1743            compaction_time_window: None,
1744            flushed_entry_id: None,
1745            flushed_sequence: None,
1746            committed_sequence: None,
1747        }
1748    }
1749
1750    #[tokio::test]
1751    async fn test_compaction_update_manifest_allows_editing_state() {
1752        let env = SchedulerEnv::new().await;
1753        let region = build_test_region(&env).await;
1754        region.set_editing(RegionLeaderState::Writable).unwrap();
1755
1756        let file_id = FileId::random();
1757        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(RegionEdit {
1758            files_to_add: vec![crate::sst::file::FileMeta {
1759                region_id: region.region_id,
1760                file_id,
1761                level: 1,
1762                ..Default::default()
1763            }],
1764            files_to_remove: Vec::new(),
1765            timestamp_ms: None,
1766            compaction_time_window: None,
1767            flushed_entry_id: None,
1768            flushed_sequence: None,
1769            committed_sequence: None,
1770        }));
1771
1772        region
1773            .manifest_ctx
1774            .update_manifest_for_compaction(action_list)
1775            .await
1776            .unwrap();
1777
1778        assert!(
1779            region
1780                .manifest_ctx
1781                .manifest()
1782                .await
1783                .files
1784                .contains_key(&file_id)
1785        );
1786    }
1787
1788    #[tokio::test]
1789    async fn test_exit_staging_partition_expr_change_and_edit_success() {
1790        let env = SchedulerEnv::new().await;
1791        let region = build_test_region(&env).await;
1792
1793        let mut manager = region.manifest_ctx.manifest_manager.write().await;
1794        region.set_staging(&mut manager).await.unwrap();
1795        manager
1796            .update(
1797                RegionMetaActionList::new(vec![
1798                    RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
1799                        partition_expr: Some("expr_a".to_string()),
1800                    }),
1801                    RegionMetaAction::Edit(empty_edit()),
1802                ]),
1803                true,
1804            )
1805            .await
1806            .unwrap();
1807
1808        region.exit_staging_on_success(&mut manager).await.unwrap();
1809        drop(manager);
1810
1811        assert_eq!(
1812            region.version().metadata.partition_expr.as_deref(),
1813            Some("expr_a")
1814        );
1815        assert_eq!(
1816            region.state(),
1817            RegionRoleState::Leader(RegionLeaderState::Writable)
1818        );
1819    }
1820
1821    #[tokio::test]
1822    async fn test_exit_staging_change_with_same_columns_success() {
1823        let env = SchedulerEnv::new().await;
1824        let region = build_test_region(&env).await;
1825
1826        let mut manager = region.manifest_ctx.manifest_manager.write().await;
1827        region.set_staging(&mut manager).await.unwrap();
1828
1829        let mut changed_metadata = region.version().metadata.as_ref().clone();
1830        changed_metadata.set_partition_expr(Some("expr_b".to_string()));
1831
1832        manager
1833            .update(
1834                RegionMetaActionList::new(vec![
1835                    RegionMetaAction::Change(RegionChange {
1836                        metadata: Arc::new(changed_metadata),
1837                        sst_format: FormatType::PrimaryKey,
1838                        append_mode: None,
1839                    }),
1840                    RegionMetaAction::Edit(empty_edit()),
1841                ]),
1842                true,
1843            )
1844            .await
1845            .unwrap();
1846
1847        region.exit_staging_on_success(&mut manager).await.unwrap();
1848        drop(manager);
1849
1850        assert_eq!(
1851            region.version().metadata.partition_expr.as_deref(),
1852            Some("expr_b")
1853        );
1854        assert_eq!(
1855            region.state(),
1856            RegionRoleState::Leader(RegionLeaderState::Writable)
1857        );
1858    }
1859
1860    #[tokio::test]
1861    async fn test_exit_staging_change_with_different_columns_fails() {
1862        let env = SchedulerEnv::new().await;
1863        let region = build_test_region(&env).await;
1864
1865        let mut manager = region.manifest_ctx.manifest_manager.write().await;
1866        region.set_staging(&mut manager).await.unwrap();
1867
1868        let mut changed_metadata = region.version().metadata.as_ref().clone();
1869        changed_metadata.column_metadatas.rotate_left(1);
1870
1871        manager
1872            .update(
1873                RegionMetaActionList::new(vec![
1874                    RegionMetaAction::Change(RegionChange {
1875                        metadata: Arc::new(changed_metadata),
1876                        sst_format: FormatType::PrimaryKey,
1877                        append_mode: None,
1878                    }),
1879                    RegionMetaAction::Edit(empty_edit()),
1880                ]),
1881                true,
1882            )
1883            .await
1884            .unwrap();
1885
1886        let result = region.exit_staging_on_success(&mut manager).await;
1887        assert!(matches!(result, Err(Error::Unexpected { .. })));
1888    }
1889
1890    #[tokio::test]
1891    async fn test_exit_staging_partition_expr_change_and_change_conflict_fails() {
1892        let env = SchedulerEnv::new().await;
1893        let region = build_test_region(&env).await;
1894
1895        let mut manager = region.manifest_ctx.manifest_manager.write().await;
1896        region.set_staging(&mut manager).await.unwrap();
1897
1898        let mut changed_metadata = region.version().metadata.as_ref().clone();
1899        changed_metadata.set_partition_expr(Some("expr_c".to_string()));
1900
1901        manager
1902            .update(
1903                RegionMetaActionList::new(vec![
1904                    RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
1905                        partition_expr: Some("expr_c".to_string()),
1906                    }),
1907                    RegionMetaAction::Change(RegionChange {
1908                        metadata: Arc::new(changed_metadata),
1909                        sst_format: FormatType::PrimaryKey,
1910                        append_mode: None,
1911                    }),
1912                    RegionMetaAction::Edit(empty_edit()),
1913                ]),
1914                true,
1915            )
1916            .await
1917            .unwrap();
1918
1919        let result = region.exit_staging_on_success(&mut manager).await;
1920        assert!(matches!(result, Err(Error::Unexpected { .. })));
1921    }
1922
1923    #[tokio::test]
1924    async fn test_set_region_state() {
1925        let env = SchedulerEnv::new().await;
1926        let builder = VersionControlBuilder::new();
1927        let version_control = Arc::new(builder.build());
1928        let manifest_ctx = env
1929            .mock_manifest_context(version_control.current().version.metadata.clone())
1930            .await;
1931
1932        let region_id = RegionId::new(1024, 0);
1933        // Leader -> Follower
1934        manifest_ctx.set_role(RegionRole::Follower, region_id);
1935        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1936
1937        // Follower -> Leader
1938        manifest_ctx.set_role(RegionRole::Leader, region_id);
1939        assert_eq!(
1940            manifest_ctx.state.load(),
1941            RegionRoleState::Leader(RegionLeaderState::Writable)
1942        );
1943
1944        // Direct Leader -> StagingLeader should be ignored.
1945        manifest_ctx.set_role(RegionRole::StagingLeader, region_id);
1946        assert_eq!(
1947            manifest_ctx.state.load(),
1948            RegionRoleState::Leader(RegionLeaderState::Writable)
1949        );
1950
1951        // Leader -> Downgrading Leader
1952        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1953        assert_eq!(
1954            manifest_ctx.state.load(),
1955            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1956        );
1957
1958        // Downgrading Leader -> Follower
1959        manifest_ctx.set_role(RegionRole::Follower, region_id);
1960        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1961
1962        // Can't downgrade from follower (Follower -> Downgrading Leader)
1963        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1964        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1965
1966        // Set region role too Downgrading Leader
1967        manifest_ctx.set_role(RegionRole::Leader, region_id);
1968        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1969        assert_eq!(
1970            manifest_ctx.state.load(),
1971            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1972        );
1973
1974        // Downgrading Leader -> Leader
1975        manifest_ctx.set_role(RegionRole::Leader, region_id);
1976        assert_eq!(
1977            manifest_ctx.state.load(),
1978            RegionRoleState::Leader(RegionLeaderState::Writable)
1979        );
1980    }
1981
1982    #[tokio::test]
1983    async fn test_staging_state_validation() {
1984        let env = SchedulerEnv::new().await;
1985        let builder = VersionControlBuilder::new();
1986        let version_control = Arc::new(builder.build());
1987
1988        // Create context with staging state using the correct pattern from SchedulerEnv
1989        let staging_ctx = {
1990            let manager = RegionManifestManager::new(
1991                version_control.current().version.metadata.clone(),
1992                0,
1993                RegionManifestOptions {
1994                    manifest_dir: "".to_string(),
1995                    object_store: env.access_layer.object_store().clone(),
1996                    compress_type: CompressionType::Uncompressed,
1997                    checkpoint_distance: 10,
1998                    remove_file_options: Default::default(),
1999                    manifest_cache: None,
2000                },
2001                FormatType::PrimaryKey,
2002                &Default::default(),
2003            )
2004            .await
2005            .unwrap();
2006            Arc::new(ManifestContext::new(
2007                manager,
2008                RegionRoleState::Leader(RegionLeaderState::Staging),
2009            ))
2010        };
2011
2012        // Test staging state behavior
2013        assert_eq!(
2014            staging_ctx.current_state(),
2015            RegionRoleState::Leader(RegionLeaderState::Staging)
2016        );
2017
2018        // Test writable context for comparison
2019        let writable_ctx = env
2020            .mock_manifest_context(version_control.current().version.metadata.clone())
2021            .await;
2022
2023        assert_eq!(
2024            writable_ctx.current_state(),
2025            RegionRoleState::Leader(RegionLeaderState::Writable)
2026        );
2027    }
2028
2029    #[tokio::test]
2030    async fn test_staging_state_transitions() {
2031        let builder = VersionControlBuilder::new();
2032        let version_control = Arc::new(builder.build());
2033        let metadata = version_control.current().version.metadata.clone();
2034
2035        // Create MitoRegion for testing state transitions
2036        let temp_dir = create_temp_dir("");
2037        let path_str = temp_dir.path().display().to_string();
2038        let fs_builder = Fs::default().root(&path_str);
2039        let object_store = ObjectStore::new(fs_builder).unwrap().finish();
2040
2041        let index_aux_path = temp_dir.path().join("index_aux");
2042        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
2043            .await
2044            .unwrap();
2045        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
2046            .await
2047            .unwrap();
2048
2049        let access_layer = Arc::new(AccessLayer::new(
2050            "",
2051            PathType::Bare,
2052            object_store,
2053            puffin_mgr,
2054            intm_mgr,
2055        ));
2056
2057        let manager = RegionManifestManager::new(
2058            metadata.clone(),
2059            0,
2060            RegionManifestOptions {
2061                manifest_dir: "".to_string(),
2062                object_store: access_layer.object_store().clone(),
2063                compress_type: CompressionType::Uncompressed,
2064                checkpoint_distance: 10,
2065                remove_file_options: Default::default(),
2066                manifest_cache: None,
2067            },
2068            FormatType::PrimaryKey,
2069            &Default::default(),
2070        )
2071        .await
2072        .unwrap();
2073
2074        let manifest_ctx = Arc::new(ManifestContext::new(
2075            manager,
2076            RegionRoleState::Leader(RegionLeaderState::Writable),
2077        ));
2078
2079        let region = MitoRegion {
2080            region_id: metadata.region_id,
2081            version_control,
2082            access_layer,
2083            manifest_ctx: manifest_ctx.clone(),
2084            file_purger: crate::test_util::new_noop_file_purger(),
2085            provider: Provider::noop_provider(),
2086            last_flush_millis: Default::default(),
2087            last_compaction_millis: Default::default(),
2088            time_provider: Arc::new(StdTimeProvider),
2089            topic_latest_entry_id: Default::default(),
2090            written_bytes: Arc::new(AtomicU64::new(0)),
2091            stats: ManifestStats::default(),
2092        };
2093
2094        // Test initial state
2095        assert_eq!(
2096            region.state(),
2097            RegionRoleState::Leader(RegionLeaderState::Writable)
2098        );
2099        assert!(!region.is_staging());
2100
2101        // Test transition to staging
2102        let mut manager = manifest_ctx.manifest_manager.write().await;
2103        region.set_staging(&mut manager).await.unwrap();
2104        drop(manager);
2105        assert_eq!(
2106            region.state(),
2107            RegionRoleState::Leader(RegionLeaderState::Staging)
2108        );
2109        assert!(region.is_staging());
2110
2111        // Test transition back to writable
2112        region.exit_staging().unwrap();
2113        assert_eq!(
2114            region.state(),
2115            RegionRoleState::Leader(RegionLeaderState::Writable)
2116        );
2117        assert!(!region.is_staging());
2118
2119        // Test staging directory cleanup: Create dirty staging files before entering staging mode
2120        {
2121            // Create some dummy staging manifest files to simulate interrupted session
2122            let manager = manifest_ctx.manifest_manager.write().await;
2123            let dummy_actions = RegionMetaActionList::new(vec![]);
2124            let dummy_bytes = dummy_actions.encode().unwrap();
2125
2126            // Create dirty staging files with versions 100 and 101
2127            manager.store().save(100, &dummy_bytes, true).await.unwrap();
2128            manager.store().save(101, &dummy_bytes, true).await.unwrap();
2129            drop(manager);
2130
2131            // Verify dirty files exist before entering staging
2132            let manager = manifest_ctx.manifest_manager.read().await;
2133            let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
2134            assert_eq!(
2135                dirty_manifests.len(),
2136                2,
2137                "Should have 2 dirty staging files"
2138            );
2139            drop(manager);
2140
2141            // Enter staging mode - this should clean up the dirty files
2142            let mut manager = manifest_ctx.manifest_manager.write().await;
2143            region.set_staging(&mut manager).await.unwrap();
2144            drop(manager);
2145
2146            // Verify dirty files are cleaned up after entering staging
2147            let manager = manifest_ctx.manifest_manager.read().await;
2148            let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
2149            assert_eq!(
2150                cleaned_manifests.len(),
2151                0,
2152                "Dirty staging files should be cleaned up"
2153            );
2154            drop(manager);
2155
2156            // Exit staging to restore normal state for remaining tests
2157            region.exit_staging().unwrap();
2158        }
2159
2160        // Test invalid transitions
2161        let mut manager = manifest_ctx.manifest_manager.write().await;
2162        assert!(region.set_staging(&mut manager).await.is_ok()); // Writable -> Staging should work
2163        drop(manager);
2164        let mut manager = manifest_ctx.manifest_manager.write().await;
2165        assert!(region.set_staging(&mut manager).await.is_err()); // Staging -> Staging should fail
2166        drop(manager);
2167        assert!(region.exit_staging().is_ok()); // Staging -> Writable should work
2168        assert!(region.exit_staging().is_err()); // Writable -> Writable should fail
2169    }
2170}