Skip to main content

mito2/
region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region.
16
17pub mod catchup;
18pub mod opener;
19pub mod options;
20pub mod utils;
21pub(crate) mod version;
22
23use std::collections::hash_map::Entry;
24use std::collections::{HashMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27
28use common_base::hash::partition_expr_version;
29use common_telemetry::{error, info, warn};
30use crossbeam_utils::atomic::AtomicCell;
31use partition::expr::PartitionExpr;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::PrimaryKeyEncoding;
35use store_api::logstore::provider::Provider;
36use store_api::metadata::RegionMetadataRef;
37use store_api::metrics::{REGION_QUERY_CPU_TIME, REGION_QUERY_SCANNED_BYTES};
38use store_api::region_engine::{
39    RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
40};
41use store_api::region_info::RegionInfoEntry;
42use store_api::region_request::{PathType, StagingPartitionDirective};
43use store_api::sst_entry::ManifestSstEntry;
44use store_api::storage::{FileId, RegionId, SequenceNumber};
45use tokio::sync::RwLockWriteGuard;
46pub use utils::*;
47
48use crate::access_layer::AccessLayerRef;
49use crate::engine::region_hook::{PendingManifestHook, RegionHookRef};
50use crate::error::{
51    FlushableRegionStateSnafu, InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu,
52    RegionTruncatedSnafu, Result, UnexpectedSnafu, UpdateManifestSnafu,
53};
54use crate::manifest::action::{
55    RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
56};
57use crate::manifest::manager::RegionManifestManager;
58use crate::region::version::{VersionControlRef, VersionRef};
59use crate::request::{OnFailure, OptionOutputTx};
60use crate::sst::file::FileMeta;
61use crate::sst::file_purger::FilePurgerRef;
62use crate::sst::location::{index_file_path, sst_file_path};
63use crate::time_provider::TimeProviderRef;
64
65/// This is the approximate factor to estimate the size of wal.
66const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
67
68/// Region status include region id, memtable usage, sst usage, wal usage and manifest usage.
69#[derive(Debug)]
70pub struct RegionUsage {
71    pub region_id: RegionId,
72    pub wal_usage: u64,
73    pub sst_usage: u64,
74    pub manifest_usage: u64,
75}
76
77impl RegionUsage {
78    pub fn disk_usage(&self) -> u64 {
79        self.wal_usage + self.sst_usage + self.manifest_usage
80    }
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84pub enum RegionLeaderState {
85    /// The region is opened and is writable.
86    Writable,
87    /// The region is in staging mode - writable but no checkpoint/compaction.
88    Staging,
89    /// The region is entering staging mode. - write requests will be stalled.
90    EnteringStaging,
91    /// The region is altering.
92    Altering,
93    /// The region is dropping.
94    Dropping,
95    /// The region is truncating.
96    Truncating,
97    /// The region is handling a region edit.
98    Editing,
99    /// The region is stepping down.
100    Downgrading,
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub enum RegionRoleState {
105    Leader(RegionLeaderState),
106    Follower,
107}
108
109impl RegionRoleState {
110    /// Converts the region role state to leader state if it is a leader state.
111    pub fn into_leader_state(self) -> Option<RegionLeaderState> {
112        match self {
113            RegionRoleState::Leader(leader_state) => Some(leader_state),
114            RegionRoleState::Follower => None,
115        }
116    }
117
118    pub(crate) fn as_str(&self) -> &'static str {
119        match self {
120            RegionRoleState::Follower => "Follower",
121            RegionRoleState::Leader(RegionLeaderState::Writable) => "Leader(Writable)",
122            RegionRoleState::Leader(RegionLeaderState::Staging) => "Leader(Staging)",
123            RegionRoleState::Leader(RegionLeaderState::EnteringStaging) => {
124                "Leader(EnteringStaging)"
125            }
126            RegionRoleState::Leader(RegionLeaderState::Altering) => "Leader(Altering)",
127            RegionRoleState::Leader(RegionLeaderState::Dropping) => "Leader(Dropping)",
128            RegionRoleState::Leader(RegionLeaderState::Truncating) => "Leader(Truncating)",
129            RegionRoleState::Leader(RegionLeaderState::Editing) => "Leader(Editing)",
130            RegionRoleState::Leader(RegionLeaderState::Downgrading) => "Leader(Downgrading)",
131        }
132    }
133}
134
135/// Metadata and runtime status of a region.
136///
137/// Writing and reading a region follow a single-writer-multi-reader rule:
138/// - Only the region worker thread this region belongs to can modify the metadata.
139/// - Multiple reader threads are allowed to read a specific `version` of a region.
140#[derive(Debug)]
141pub struct MitoRegion {
142    /// Id of this region.
143    ///
144    /// Accessing region id from the version control is inconvenient so
145    /// we also store it here.
146    pub(crate) region_id: RegionId,
147
148    /// Version controller for this region.
149    ///
150    /// We MUST update the version control inside the write lock of the region manifest manager.
151    pub(crate) version_control: VersionControlRef,
152    /// SSTs accessor for this region.
153    pub(crate) access_layer: AccessLayerRef,
154    /// Context to maintain manifest for this region.
155    pub(crate) manifest_ctx: ManifestContextRef,
156    /// SST file purger.
157    pub(crate) file_purger: FilePurgerRef,
158    /// The provider of log store.
159    pub(crate) provider: Provider,
160    /// Last flush time in millis.
161    last_flush_millis: AtomicI64,
162    /// Last schedule compaction time in millis.
163    last_schedule_compaction_millis: AtomicI64,
164    /// Provider to get current time.
165    time_provider: TimeProviderRef,
166    /// The topic's latest entry id since the region's last flushing.
167    /// **Only used for remote WAL pruning.**
168    ///
169    /// The value will be updated to the latest offset of the topic
170    /// if region receives a flush request or schedules a periodic flush task
171    /// and the region's memtable is empty.
172    ///
173    /// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region,
174    /// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`.
175    pub(crate) topic_latest_entry_id: AtomicU64,
176    /// The total bytes written to the region.
177    pub(crate) written_bytes: Arc<AtomicU64>,
178    /// manifest stats
179    stats: ManifestStats,
180}
181
182pub type MitoRegionRef = Arc<MitoRegion>;
183
184#[derive(Debug, Clone)]
185pub(crate) struct StagingPartitionInfo {
186    pub(crate) partition_directive: StagingPartitionDirective,
187    pub(crate) partition_rule_version: u64,
188}
189
190impl StagingPartitionInfo {
191    /// Returns the partition expression carried by the staging directive, if any.
192    pub(crate) fn partition_expr(&self) -> Option<&str> {
193        self.partition_directive.partition_expr()
194    }
195
196    /// Builds staging partition info from a directive and derives its version marker.
197    pub(crate) fn from_partition_directive(partition_directive: StagingPartitionDirective) -> Self {
198        let partition_rule_version = match &partition_directive {
199            StagingPartitionDirective::UpdatePartitionExpr(expr) => {
200                partition_expr_version(Some(expr))
201            }
202            StagingPartitionDirective::RejectAllWrites => 0,
203        };
204        Self {
205            partition_directive,
206            partition_rule_version,
207        }
208    }
209}
210
211impl MitoRegion {
212    fn remove_region_metrics(&self) {
213        let region_id = self.region_id.as_u64().to_string();
214        let labels = &[region_id.as_str()];
215        let _ = REGION_QUERY_CPU_TIME.remove_label_values(labels);
216        let _ = REGION_QUERY_SCANNED_BYTES.remove_label_values(labels);
217    }
218
219    /// Stop background managers for this region.
220    pub(crate) async fn stop(&self) {
221        self.manifest_ctx
222            .manifest_manager
223            .write()
224            .await
225            .stop()
226            .await;
227
228        info!(
229            "Stopped region manifest manager, region_id: {}",
230            self.region_id
231        );
232    }
233
234    /// Returns current metadata of the region.
235    pub fn metadata(&self) -> RegionMetadataRef {
236        let version_data = self.version_control.current();
237        version_data.version.metadata.clone()
238    }
239
240    /// Returns primary key encoding of the region.
241    pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
242        let version_data = self.version_control.current();
243        version_data.version.metadata.primary_key_encoding
244    }
245
246    /// Returns current version of the region.
247    pub(crate) fn version(&self) -> VersionRef {
248        let version_data = self.version_control.current();
249        version_data.version
250    }
251
252    /// Returns last flush timestamp in millis.
253    pub(crate) fn last_flush_millis(&self) -> i64 {
254        self.last_flush_millis.load(Ordering::Relaxed)
255    }
256
257    /// Update flush time to current time.
258    pub(crate) fn update_flush_millis(&self) {
259        let now = self.time_provider.current_time_millis();
260        self.last_flush_millis.store(now, Ordering::Relaxed);
261    }
262
263    /// Returns last schedule compaction timestamp in millis.
264    pub(crate) fn last_schedule_compaction_millis(&self) -> i64 {
265        self.last_schedule_compaction_millis.load(Ordering::Relaxed)
266    }
267
268    /// Update schedule compaction time to current time.
269    pub(crate) fn update_schedule_compaction_millis(&self) {
270        let now = self.time_provider.current_time_millis();
271        self.last_schedule_compaction_millis
272            .store(now, Ordering::Relaxed);
273    }
274
275    /// Returns the table dir.
276    pub(crate) fn table_dir(&self) -> &str {
277        self.access_layer.table_dir()
278    }
279
280    /// Returns the path type of the region.
281    pub(crate) fn path_type(&self) -> PathType {
282        self.access_layer.path_type()
283    }
284
285    /// Returns whether the region is writable.
286    pub(crate) fn is_writable(&self) -> bool {
287        matches!(
288            self.manifest_ctx.state.load(),
289            RegionRoleState::Leader(RegionLeaderState::Writable)
290                | RegionRoleState::Leader(RegionLeaderState::Staging)
291        )
292    }
293
294    /// Returns whether the region is flushable.
295    pub(crate) fn is_flushable(&self) -> bool {
296        matches!(
297            self.manifest_ctx.state.load(),
298            RegionRoleState::Leader(RegionLeaderState::Writable)
299                | RegionRoleState::Leader(RegionLeaderState::Staging)
300                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
301        )
302    }
303
304    /// Returns whether the region should abort index building.
305    pub(crate) fn should_abort_index(&self) -> bool {
306        matches!(
307            self.manifest_ctx.state.load(),
308            RegionRoleState::Follower
309                | RegionRoleState::Leader(RegionLeaderState::Dropping)
310                | RegionRoleState::Leader(RegionLeaderState::Truncating)
311                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
312                | RegionRoleState::Leader(RegionLeaderState::Staging)
313        )
314    }
315
316    /// Returns whether the region is downgrading.
317    pub(crate) fn is_downgrading(&self) -> bool {
318        matches!(
319            self.manifest_ctx.state.load(),
320            RegionRoleState::Leader(RegionLeaderState::Downgrading)
321        )
322    }
323
324    /// Returns whether the region is in staging mode.
325    pub(crate) fn is_staging(&self) -> bool {
326        self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
327    }
328
329    /// Returns whether the region is entering staging mode.
330    pub(crate) fn is_enter_staging(&self) -> bool {
331        self.manifest_ctx.state.load()
332            == RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
333    }
334
335    pub fn region_id(&self) -> RegionId {
336        self.region_id
337    }
338
339    pub fn find_committed_sequence(&self) -> SequenceNumber {
340        self.version_control.committed_sequence()
341    }
342
343    /// Returns the latest sequence that has already been persisted into SSTs.
344    ///
345    /// Incremental memtable-only reads must use a cursor greater than or equal to
346    /// this boundary; older cursors are stale because the corresponding updates may
347    /// already have been flushed out of memtables.
348    pub fn flushed_sequence(&self) -> SequenceNumber {
349        self.version_control.current().version.flushed_sequence
350    }
351
352    /// Returns whether the region is readonly.
353    pub fn is_follower(&self) -> bool {
354        self.manifest_ctx.state.load() == RegionRoleState::Follower
355    }
356
357    /// Returns the state of the region.
358    pub(crate) fn state(&self) -> RegionRoleState {
359        self.manifest_ctx.state.load()
360    }
361
362    /// Sets the region role state.
363    pub(crate) fn set_role(&self, next_role: RegionRole) {
364        self.manifest_ctx.set_role(next_role, self.region_id);
365    }
366
367    pub(crate) fn region_role(&self) -> RegionRole {
368        match self.state() {
369            RegionRoleState::Follower => RegionRole::Follower,
370            RegionRoleState::Leader(RegionLeaderState::Staging) => RegionRole::StagingLeader,
371            RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
372                RegionRole::DowngradingLeader
373            }
374            RegionRoleState::Leader(_) => RegionRole::Leader,
375        }
376    }
377
378    /// Sets the altering state.
379    /// You should call this method in the worker loop.
380    pub(crate) fn set_altering(&self) -> Result<()> {
381        self.compare_exchange_state(
382            RegionLeaderState::Writable,
383            RegionRoleState::Leader(RegionLeaderState::Altering),
384        )
385    }
386
387    /// Sets the dropping state.
388    /// You should call this method in the worker loop.
389    pub(crate) fn set_dropping(&self, expect: RegionLeaderState) -> Result<()> {
390        self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Dropping))
391    }
392
393    /// Sets the truncating state.
394    /// You should call this method in the worker loop.
395    pub(crate) fn set_truncating(&self) -> Result<()> {
396        self.compare_exchange_state(
397            RegionLeaderState::Writable,
398            RegionRoleState::Leader(RegionLeaderState::Truncating),
399        )
400    }
401
402    /// Sets the editing state.
403    /// You should call this method in the worker loop.
404    pub(crate) fn set_editing(&self, expect: RegionLeaderState) -> Result<()> {
405        self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Editing))
406    }
407
408    /// Sets the staging state.
409    ///
410    /// You should call this method in the worker loop.
411    /// Transitions from Writable to Staging state.
412    /// Cleans any existing staging manifests before entering staging mode.
413    pub(crate) async fn set_staging(
414        &self,
415        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
416    ) -> Result<()> {
417        manager.store().clear_staging_manifests().await?;
418
419        self.compare_exchange_state(
420            RegionLeaderState::Writable,
421            RegionRoleState::Leader(RegionLeaderState::Staging),
422        )
423    }
424
425    /// Sets the entering staging state.
426    pub(crate) fn set_entering_staging(&self) -> Result<()> {
427        self.compare_exchange_state(
428            RegionLeaderState::Writable,
429            RegionRoleState::Leader(RegionLeaderState::EnteringStaging),
430        )
431    }
432
433    /// Exits the staging state back to writable.
434    ///
435    /// You should call this method in the worker loop.
436    /// Transitions from Staging to Writable state.
437    pub fn exit_staging(&self) -> Result<()> {
438        self.manifest_ctx.exit_staging(
439            self.region_id,
440            RegionRoleState::Leader(RegionLeaderState::Writable),
441        )
442    }
443
444    /// Sets the region role state gracefully. This acquires the manifest write lock.
445    pub(crate) async fn set_role_state_gracefully(
446        &self,
447        state: SettableRegionRoleState,
448    ) -> Result<()> {
449        let mut manager: RwLockWriteGuard<'_, RegionManifestManager> =
450            self.manifest_ctx.manifest_manager.write().await;
451        let current_state = self.state();
452
453        let hook_payload: Option<PendingManifestHook> = match state {
454            SettableRegionRoleState::Leader => {
455                // Exit staging mode and return to normal writable leader
456                // Only allowed from staging state
457                match current_state {
458                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
459                        info!("Exiting staging mode for region {}", self.region_id);
460                        // Use the success exit path that merges all staged manifests
461                        self.exit_staging_on_success(&mut manager).await?
462                    }
463                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
464                        // Already in desired state - no-op
465                        info!("Region {} already in normal leader mode", self.region_id);
466                        None
467                    }
468                    _ => {
469                        // Only staging -> leader transition is allowed
470                        return Err(RegionStateSnafu {
471                            region_id: self.region_id,
472                            state: current_state,
473                            expect: RegionRoleState::Leader(RegionLeaderState::Staging),
474                        }
475                        .build());
476                    }
477                }
478            }
479
480            SettableRegionRoleState::StagingLeader => {
481                // Enter staging mode from normal writable leader
482                // Only allowed from writable leader state
483                match current_state {
484                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
485                        info!("Entering staging mode for region {}", self.region_id);
486                        self.set_staging(&mut manager).await?;
487                    }
488                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
489                        // Already in desired state - no-op
490                        info!("Region {} already in staging mode", self.region_id);
491                    }
492                    _ => {
493                        return Err(RegionStateSnafu {
494                            region_id: self.region_id,
495                            state: current_state,
496                            expect: RegionRoleState::Leader(RegionLeaderState::Writable),
497                        }
498                        .build());
499                    }
500                }
501                None
502            }
503
504            SettableRegionRoleState::Follower => {
505                // Make this region a follower
506                match current_state {
507                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
508                        info!(
509                            "Exiting staging and demoting region {} to follower",
510                            self.region_id
511                        );
512                        self.exit_staging()?;
513                        self.set_role(RegionRole::Follower);
514                    }
515                    RegionRoleState::Leader(_) => {
516                        info!("Demoting region {} from leader to follower", self.region_id);
517                        self.set_role(RegionRole::Follower);
518                    }
519                    RegionRoleState::Follower => {
520                        // Already in desired state - no-op
521                        info!("Region {} already in follower mode", self.region_id);
522                    }
523                }
524                None
525            }
526
527            SettableRegionRoleState::DowngradingLeader => {
528                // downgrade this region to downgrading leader
529                match current_state {
530                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
531                        info!(
532                            "Exiting staging and entering downgrade for region {}",
533                            self.region_id
534                        );
535                        self.exit_staging()?;
536                        self.set_role(RegionRole::DowngradingLeader);
537                    }
538                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
539                        info!("Starting downgrade for region {}", self.region_id);
540                        self.set_role(RegionRole::DowngradingLeader);
541                    }
542                    RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
543                        // Already in desired state - no-op
544                        info!("Region {} already in downgrading mode", self.region_id);
545                    }
546                    _ => {
547                        warn!(
548                            "Cannot start downgrade for region {} from state {:?}",
549                            self.region_id, current_state
550                        );
551                    }
552                }
553                None
554            }
555        };
556
557        // Hack(zhongzc): If we have just become leader (writable), persist any backfilled metadata.
558        let mut backfill_hook_payload: Option<PendingManifestHook> = None;
559        if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
560            // Persist backfilled metadata if manifest is missing fields (e.g., partition_expr)
561            let manifest_meta = &manager.manifest().metadata;
562            let current_version = self.version();
563            let current_meta = &current_version.metadata;
564            if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
565                let action = RegionMetaAction::Change(RegionChange {
566                    metadata: current_meta.clone(),
567                    sst_format: current_version.options.sst_format.unwrap_or_default(),
568                    append_mode: None,
569                });
570                let action_list = RegionMetaActionList::with_action(action);
571                match self
572                    .manifest_ctx
573                    .update_locked(&mut manager, action_list, false)
574                    .await
575                {
576                    Ok(pending) => {
577                        info!(
578                            "Successfully persisted backfilled metadata for region {}, version: {}",
579                            self.region_id,
580                            pending.version()
581                        );
582                        backfill_hook_payload = Some(pending);
583                    }
584                    Err(e) => {
585                        warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
586                    }
587                }
588            }
589        }
590
591        drop(manager);
592
593        // Merge both payloads so consumers see the complete set of actions in
594        // one notification. The lock is released, so it's safe to fire.
595        let merged = match (hook_payload, backfill_hook_payload) {
596            (Some(staging), Some(backfill)) => Some(staging.merge(backfill)),
597            (Some(payload), None) => Some(payload),
598            (None, Some(payload)) => Some(payload),
599            (None, None) => None,
600        };
601
602        if let Some(pending) = merged {
603            pending.fire().await;
604        }
605
606        Ok(())
607    }
608
609    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Writable)` if the current state is `expect`.
610    /// Otherwise, logs an error.
611    pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
612        if let Err(e) = self
613            .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
614        {
615            error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
616        }
617    }
618
619    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Staging)` if the current state is `expect`.
620    /// Otherwise, logs an error.
621    pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
622        if let Err(e) =
623            self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
624        {
625            error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
626        }
627    }
628
629    /// Returns the region statistic.
630    pub(crate) fn region_statistic(&self) -> RegionStatistic {
631        let version = self.version();
632        let memtables = &version.memtables;
633        let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
634
635        let sst_usage = version.ssts.owned_sst_usage(self.region_id);
636        let index_usage = version.ssts.owned_index_usage(self.region_id);
637        let flushed_entry_id = version.flushed_entry_id;
638
639        let wal_usage = self.estimated_wal_usage(memtable_usage);
640        let manifest_usage = self.stats.total_manifest_size();
641        let num_rows = version.ssts.owned_num_rows(self.region_id) + version.memtables.num_rows();
642        let num_files = version.ssts.owned_num_files(self.region_id);
643        let manifest_version = self.stats.manifest_version();
644        let file_removed_cnt = self.stats.file_removed_cnt();
645
646        let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
647        let written_bytes = self.written_bytes.load(Ordering::Relaxed);
648
649        RegionStatistic {
650            num_rows,
651            memtable_size: memtable_usage,
652            wal_size: wal_usage,
653            manifest_size: manifest_usage,
654            sst_size: sst_usage,
655            sst_num: num_files,
656            index_size: index_usage,
657            manifest: RegionManifestInfo::Mito {
658                manifest_version,
659                flushed_entry_id,
660                file_removed_cnt,
661            },
662            data_topic_latest_entry_id: topic_latest_entry_id,
663            metadata_topic_latest_entry_id: topic_latest_entry_id,
664            written_bytes,
665        }
666    }
667
668    /// Estimated WAL size in bytes.
669    /// Use the memtables size to estimate the size of wal.
670    fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
671        ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
672    }
673
674    /// Sets the state of the region to given state if the current state equals to
675    /// the expected.
676    fn compare_exchange_state(
677        &self,
678        expect: RegionLeaderState,
679        state: RegionRoleState,
680    ) -> Result<()> {
681        self.manifest_ctx
682            .state
683            .compare_exchange(RegionRoleState::Leader(expect), state)
684            .map_err(|actual| {
685                RegionStateSnafu {
686                    region_id: self.region_id,
687                    state: actual,
688                    expect: RegionRoleState::Leader(expect),
689                }
690                .build()
691            })?;
692        Ok(())
693    }
694
695    pub fn access_layer(&self) -> AccessLayerRef {
696        self.access_layer.clone()
697    }
698
699    /// Returns the region info entry of the region.
700    pub(crate) fn region_info_entry(&self, node_id: Option<u64>) -> RegionInfoEntry {
701        let region_id = self.region_id;
702        let version = self.version();
703        let state = self.state();
704        let role = self.region_role();
705        let region_options = serde_json::to_string(&version.options)
706            .unwrap_or_else(|err| serde_json::json!({ "error": err.to_string() }).to_string());
707        let sst_format = match version.options.sst_format.unwrap_or_default() {
708            crate::sst::FormatType::PrimaryKey => "primary_key",
709            crate::sst::FormatType::Flat => "flat",
710        }
711        .to_string();
712
713        RegionInfoEntry {
714            region_id,
715            table_id: region_id.table_id(),
716            region_number: region_id.region_number(),
717            region_group: region_id.region_group(),
718            region_sequence: region_id.region_sequence(),
719            state: state.as_str().to_string(),
720            role: role.to_string(),
721            writable: self.is_writable(),
722            committed_sequence: self.find_committed_sequence(),
723            flushed_sequence: Some(self.flushed_sequence()).filter(|sequence| *sequence > 0),
724            manifest_version: self.stats.manifest_version(),
725            compaction_time_window: version
726                .compaction_time_window
727                .map(|duration| humantime::format_duration(duration).to_string()),
728            region_options,
729            sst_format,
730            node_id,
731        }
732    }
733
734    /// Returns the SST entries of the region.
735    pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
736        let table_dir = self.table_dir();
737        let path_type = self.access_layer.path_type();
738
739        let visible_ssts = self
740            .version()
741            .ssts
742            .levels()
743            .iter()
744            .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
745            .collect::<HashSet<_>>();
746
747        let manifest_files = self.manifest_ctx.manifest().await.files.clone();
748        let staging_files = self
749            .manifest_ctx
750            .staging_manifest()
751            .await
752            .map(|m| m.files.clone())
753            .unwrap_or_default();
754        let files = manifest_files
755            .into_iter()
756            .chain(staging_files)
757            .collect::<HashMap<_, _>>();
758
759        files
760            .values()
761            .map(|meta| {
762                let region_id = self.region_id;
763                let origin_region_id = meta.region_id;
764                let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
765                {
766                    let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
767                    (
768                        meta.index_version,
769                        Some(index_file_path),
770                        Some(meta.index_file_size),
771                    )
772                } else {
773                    (0, None, None)
774                };
775                let visible = visible_ssts.contains(&meta.file_id);
776                ManifestSstEntry {
777                    table_dir: table_dir.to_string(),
778                    region_id,
779                    table_id: region_id.table_id(),
780                    region_number: region_id.region_number(),
781                    region_group: region_id.region_group(),
782                    region_sequence: region_id.region_sequence(),
783                    file_id: meta.file_id.to_string(),
784                    index_version,
785                    level: meta.level,
786                    file_path: sst_file_path(table_dir, meta.file_id(), path_type),
787                    file_size: meta.file_size,
788                    index_file_path,
789                    index_file_size,
790                    num_rows: meta.num_rows,
791                    num_row_groups: meta.num_row_groups,
792                    num_series: Some(meta.num_series),
793                    min_ts: meta.time_range.0,
794                    max_ts: meta.time_range.1,
795                    sequence: meta.sequence.map(|s| s.get()),
796                    origin_region_id,
797                    node_id: None,
798                    visible,
799                    primary_key_min: meta.primary_key_min.clone(),
800                    primary_key_max: meta.primary_key_max.clone(),
801                }
802            })
803            .collect()
804    }
805
806    /// Returns the file metas of the region by file ids.
807    pub async fn file_metas(&self, file_ids: &[FileId]) -> Vec<Option<FileMeta>> {
808        let manifest_files = self.manifest_ctx.manifest().await.files.clone();
809
810        file_ids
811            .iter()
812            .map(|file_id| manifest_files.get(file_id).cloned())
813            .collect::<Vec<_>>()
814    }
815
816    /// Exit staging mode successfully by merging all staged manifests and making them visible.
817    /// Merges staged manifest actions into the live manifest and exits staging mode.
818    ///
819    /// The caller must hold the manifest write lock and pass it via `manager`.
820    /// Returns `Ok(Some(pending))` when staging manifests were merged, or
821    /// `Ok(None)` when there were no staged manifests to merge.
822    ///
823    /// **Important:** [`fire`](PendingManifestHook::fire) the receipt only after
824    /// dropping the lock — the hook may read the manifest and deadlock otherwise.
825    pub(crate) async fn exit_staging_on_success(
826        &self,
827        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
828    ) -> Result<Option<PendingManifestHook>> {
829        let current_state = self.manifest_ctx.current_state();
830        ensure!(
831            current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
832            RegionStateSnafu {
833                region_id: self.region_id,
834                state: current_state,
835                expect: RegionRoleState::Leader(RegionLeaderState::Staging),
836            }
837        );
838
839        // Merge all staged manifest actions
840        let merged_actions = match manager.merge_staged_actions(current_state).await? {
841            Some(actions) => actions,
842            None => {
843                info!(
844                    "No staged manifests to merge for region {}, exiting staging mode without changes",
845                    self.region_id
846                );
847                // Even if no manifests to merge, we still need to exit staging mode
848                self.exit_staging()?;
849                return Ok(None);
850            }
851        };
852        let expect_change = merged_actions.actions.iter().any(|a| a.is_change());
853        let expect_partition_expr_change = merged_actions
854            .actions
855            .iter()
856            .any(|a| a.is_partition_expr_change());
857        let expect_edit = merged_actions.actions.iter().any(|a| a.is_edit());
858        ensure!(
859            !(expect_change && expect_partition_expr_change),
860            UnexpectedSnafu {
861                reason: "unexpected both change and partition expr change actions in merged actions"
862            }
863        );
864        ensure!(
865            expect_change || expect_partition_expr_change,
866            UnexpectedSnafu {
867                reason: "expect a change or partition expr change action in merged actions"
868            }
869        );
870        ensure!(
871            expect_edit,
872            UnexpectedSnafu {
873                reason: "expect an edit action in merged actions"
874            }
875        );
876
877        let (merged_partition_expr_change, merged_change, merged_edit) =
878            merged_actions.clone().split_region_change_and_edit();
879        if let Some(change) = &merged_change {
880            // In staging exit we only allow metadata-only updates. A `Change`
881            // action is accepted only when column definitions are unchanged;
882            // otherwise it is treated as a schema change and rejected.
883            let current_column_metadatas = &self.version().metadata.column_metadatas;
884            ensure!(
885                change.metadata.column_metadatas == *current_column_metadatas,
886                UnexpectedSnafu {
887                    reason: "change action alters column metadata in staging exit"
888                }
889            );
890        }
891
892        // Submit merged actions using the manifest manager's update method.
893        // Pass `false` so it saves to normal directory, not staging.
894        let pending = self
895            .manifest_ctx
896            .update_locked(manager, merged_actions, false)
897            .await?;
898        let new_version = pending.version();
899        info!(
900            "Successfully submitted merged staged manifests for region {}, new version: {}",
901            self.region_id, new_version
902        );
903
904        // Apply the merged changes to in-memory version control
905        if let Some(change) = merged_partition_expr_change {
906            let mut new_metadata = self.version().metadata.as_ref().clone();
907            new_metadata.set_partition_expr(change.partition_expr);
908            self.version_control.alter_metadata(new_metadata.into());
909        }
910        if let Some(change) = merged_change {
911            self.version_control.alter_metadata(change.metadata);
912        }
913        self.version_control
914            .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
915
916        // Clear all staging manifests and transit state
917        if let Err(e) = manager.clear_staging_manifest_and_dir().await {
918            error!(e; "Failed to clear staging manifest dir for region {}", self.region_id);
919        }
920        self.exit_staging()?;
921
922        // Return the hook payload; the caller invokes the hook after dropping the lock.
923        Ok(Some(pending))
924    }
925
926    /// Returns the partition expression string for this region.
927    ///
928    /// If the region is currently in staging state, this returns the partition expression held in
929    /// the staging partition field. Otherwise, it returns the partition expression from the primary
930    /// region metadata (current committed version).
931    pub fn maybe_staging_partition_expr_str(&self) -> Option<String> {
932        let is_staging = self.is_staging();
933        if is_staging {
934            let staging_partition_info = self.manifest_ctx.staging_partition_info();
935            if staging_partition_info.is_none() {
936                warn!(
937                    "Staging partition expr is none for region {} in staging state",
938                    self.region_id
939                );
940            }
941            staging_partition_info
942                .as_ref()
943                .and_then(|info| info.partition_expr().map(ToString::to_string))
944        } else {
945            let version = self.version();
946            version.metadata.partition_expr.clone()
947        }
948    }
949
950    pub fn expected_partition_expr_version(&self) -> u64 {
951        if self.is_staging() {
952            self.manifest_ctx
953                .staging_partition_info()
954                .as_ref()
955                .map(|info| info.partition_rule_version)
956                .unwrap_or_default()
957        } else {
958            self.version().metadata.partition_expr_version
959        }
960    }
961
962    /// Returns whether writes should be rejected for this region in staging mode.
963    pub(crate) fn reject_all_writes_in_staging(&self) -> bool {
964        if !self.is_staging() {
965            return false;
966        }
967        self.manifest_ctx
968            .staging_partition_info()
969            .as_ref()
970            .map(|info| {
971                matches!(
972                    info.partition_directive,
973                    StagingPartitionDirective::RejectAllWrites
974                )
975            })
976            .unwrap_or(false)
977    }
978}
979
980impl Drop for MitoRegion {
981    fn drop(&mut self) {
982        self.remove_region_metrics();
983    }
984}
985
986/// Context to update the region manifest.
987#[derive(Debug)]
988pub(crate) struct ManifestContext {
989    /// Manager to maintain manifest for this region. Logical writes go through
990    /// [`update_locked`](Self::update_locked) (or an [`update_manifest`](Self::update_manifest)
991    /// variant) so they produce a [`PendingManifestHook`].
992    pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
993    /// The state of the region. The region checks the state before updating
994    /// manifest.
995    state: AtomicCell<RegionRoleState>,
996    /// Partition info of the region in staging mode.
997    ///
998    /// During the staging mode, the region metadata in [`VersionControlRef`] is not updated,
999    /// so we need to store the partition info separately.
1000    staging_partition_info: Mutex<Option<StagingPartitionInfo>>,
1001    /// Optional region hook for observing manifest mutations.
1002    hook: Option<RegionHookRef>,
1003}
1004
1005impl ManifestContext {
1006    pub(crate) fn new(
1007        manager: RegionManifestManager,
1008        state: RegionRoleState,
1009        hook: Option<RegionHookRef>,
1010    ) -> Self {
1011        ManifestContext {
1012            manifest_manager: tokio::sync::RwLock::new(manager),
1013            state: AtomicCell::new(state),
1014            staging_partition_info: Mutex::new(None),
1015            hook,
1016        }
1017    }
1018
1019    /// Returns the region hook if one is registered.
1020    pub(crate) fn hook(&self) -> Option<RegionHookRef> {
1021        self.hook.clone()
1022    }
1023
1024    pub(crate) fn staging_partition_info(&self) -> Option<StagingPartitionInfo> {
1025        self.staging_partition_info.lock().unwrap().clone()
1026    }
1027
1028    pub(crate) fn set_staging_partition_info(&self, staging_partition_info: StagingPartitionInfo) {
1029        let mut current = self.staging_partition_info.lock().unwrap();
1030        debug_assert!(current.is_none());
1031        *current = Some(staging_partition_info);
1032    }
1033
1034    fn clear_staging_partition_info(&self) {
1035        *self.staging_partition_info.lock().unwrap() = None;
1036    }
1037
1038    pub(crate) fn exit_staging(
1039        &self,
1040        region_id: RegionId,
1041        next_state: RegionRoleState,
1042    ) -> Result<()> {
1043        self.state
1044            .compare_exchange(
1045                RegionRoleState::Leader(RegionLeaderState::Staging),
1046                next_state,
1047            )
1048            .map_err(|actual| {
1049                RegionStateSnafu {
1050                    region_id,
1051                    state: actual,
1052                    expect: RegionRoleState::Leader(RegionLeaderState::Staging),
1053                }
1054                .build()
1055            })?;
1056        self.clear_staging_partition_info();
1057        Ok(())
1058    }
1059
1060    pub(crate) async fn manifest_version(&self) -> ManifestVersion {
1061        self.manifest_manager
1062            .read()
1063            .await
1064            .manifest()
1065            .manifest_version
1066    }
1067
1068    pub(crate) async fn has_update(&self) -> Result<bool> {
1069        self.manifest_manager.read().await.has_update().await
1070    }
1071
1072    /// Returns the current region role state.
1073    pub(crate) fn current_state(&self) -> RegionRoleState {
1074        self.state.load()
1075    }
1076
1077    /// Installs the manifest changes from the current version to the target version (inclusive).
1078    ///
1079    /// Returns installed [RegionManifest].
1080    /// **Note**: This function is not guaranteed to install the target version strictly.
1081    /// The installed version may be greater than the target version.
1082    pub(crate) async fn install_manifest_to(
1083        &self,
1084        version: ManifestVersion,
1085    ) -> Result<Arc<RegionManifest>> {
1086        let mut manager = self.manifest_manager.write().await;
1087        manager.install_manifest_to(version).await?;
1088
1089        Ok(manager.manifest())
1090    }
1091
1092    /// Updates the manifest if current state is `expect_state`.
1093    pub(crate) async fn update_manifest(
1094        &self,
1095        expect_state: RegionLeaderState,
1096        action_list: RegionMetaActionList,
1097        is_staging: bool,
1098    ) -> Result<ManifestVersion> {
1099        self.update_manifest_with_state_check(action_list, is_staging, |current_state, region_id| {
1100            // If expect_state is not downgrading, the current state must be either `expect_state` or downgrading.
1101            //
1102            // A downgrading leader rejects user writes but still allows
1103            // flushing the memtable and updating the manifest.
1104            if expect_state != RegionLeaderState::Downgrading {
1105                if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
1106                    info!(
1107                        "Region {} is in downgrading leader state, updating manifest. Expect state is {:?}",
1108                        region_id, expect_state
1109                    );
1110                }
1111                ensure!(
1112                    current_state == RegionRoleState::Leader(expect_state)
1113                        || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
1114                    UpdateManifestSnafu {
1115                        region_id,
1116                        state: current_state,
1117                    }
1118                );
1119            } else {
1120                ensure!(
1121                    current_state == RegionRoleState::Leader(expect_state),
1122                    RegionStateSnafu {
1123                        region_id,
1124                        state: current_state,
1125                        expect: RegionRoleState::Leader(expect_state),
1126                    }
1127                );
1128            }
1129
1130            Ok(())
1131        })
1132        .await
1133    }
1134
1135    /// Updates the manifest for compaction.
1136    ///
1137    /// Compaction may finish while a direct external region edit is in the transient
1138    /// `Editing` state. Direct external edits can remove files both when followers
1139    /// apply sync-region metadata and when a writable leader performs a direct edit
1140    /// such as `edit_region()`. Allowing compaction to publish in `Editing` is still
1141    /// safe because publication happens under the manifest write lock and compaction
1142    /// rechecks that its input files are still valid before committing.
1143    ///
1144    /// This intentionally writes to the normal manifest path (`is_staging = false`).
1145    /// Entering staging cancels or waits for active compactions before switching the
1146    /// region to `Staging`, so a compaction that started before staging still finishes
1147    /// against the normal manifest. Even if a manual compaction is requested while the
1148    /// region is already staging, compaction only sees SSTs in the normal visible
1149    /// region version; SSTs from staging manifests are not applied to region version
1150    /// control until staging exits successfully.
1151    pub(crate) async fn update_manifest_for_compaction(
1152        &self,
1153        action_list: RegionMetaActionList,
1154    ) -> Result<ManifestVersion> {
1155        self.update_manifest_with_state_check(action_list, false, |current_state, region_id| {
1156            ensure!(
1157                matches!(
1158                    current_state,
1159                    RegionRoleState::Leader(RegionLeaderState::Writable)
1160                        | RegionRoleState::Leader(RegionLeaderState::Editing)
1161                        | RegionRoleState::Leader(RegionLeaderState::Downgrading)
1162                ),
1163                UpdateManifestSnafu {
1164                    region_id,
1165                    state: current_state,
1166                }
1167            );
1168
1169            Ok(())
1170        })
1171        .await
1172    }
1173
1174    /// Performs a manifest write under a caller-held write lock and returns a
1175    /// [`PendingManifestHook`] to [`fire`](PendingManifestHook::fire) after
1176    /// dropping the lock. This is the sole caller of
1177    /// [`RegionManifestManager::update`], so it is the funnel through which all
1178    /// logical manifest writes notify the hook.
1179    ///
1180    /// Does not validate state or edit applicability — the caller must do so
1181    /// while still holding the lock (see `update_manifest_with_state_check`
1182    /// and `MitoRegion::exit_staging_on_success`).
1183    pub(crate) async fn update_locked(
1184        &self,
1185        manager: &mut RegionManifestManager,
1186        action_list: RegionMetaActionList,
1187        is_staging: bool,
1188    ) -> Result<PendingManifestHook> {
1189        let region_id = manager.manifest().metadata.region_id;
1190        // Clone before `action_list` is moved into `update` so the hook still
1191        // sees what was written.
1192        let action_list_for_hook = self.hook.as_ref().map(|_| action_list.clone());
1193        let version = manager
1194            .update(action_list, is_staging)
1195            .await
1196            .inspect_err(|e| error!(e; "Failed to update manifest, region_id: {}", region_id))?;
1197
1198        Ok(PendingManifestHook::new(
1199            region_id,
1200            action_list_for_hook,
1201            version,
1202            self.hook.clone(),
1203        ))
1204    }
1205
1206    async fn update_manifest_with_state_check(
1207        &self,
1208        action_list: RegionMetaActionList,
1209        is_staging: bool,
1210        check_state: impl FnOnce(RegionRoleState, RegionId) -> Result<()>,
1211    ) -> Result<ManifestVersion> {
1212        // Acquires the write lock of the manifest manager.
1213        let mut manager = self.manifest_manager.write().await;
1214        // Gets current manifest.
1215        let manifest = manager.manifest();
1216        // Checks state inside the lock. This is to ensure that we won't update the manifest
1217        // after `set_readonly_gracefully()` is called.
1218        let current_state = self.state.load();
1219        check_state(current_state, manifest.metadata.region_id)?;
1220
1221        for action in &action_list.actions {
1222            // Checks whether the edit is still applicable.
1223            let RegionMetaAction::Edit(edit) = &action else {
1224                continue;
1225            };
1226
1227            // Checks whether the region is truncated.
1228            let Some(truncated_entry_id) = manifest.truncated_entry_id else {
1229                continue;
1230            };
1231
1232            // This is an edit from flush.
1233            if let Some(flushed_entry_id) = edit.flushed_entry_id {
1234                // A flush edit is valid after truncate in two cases:
1235                // 1. `flushed_entry_id` moves past `truncated_entry_id`, meaning it definitely
1236                //    flushed data newer than the truncate point.
1237                // 2. `flushed_entry_id` equals `truncated_entry_id`, but `flushed_sequence`
1238                //    increases. This happens in skip-WAL tables where entry id can stay at 0,
1239                //    while sequence still advances for post-truncate writes.
1240                //
1241                // We still reject stale flushes from before truncate:
1242                // if entry id is equal and sequence does not advance, the flush is outdated.
1243                let is_newer_entry = truncated_entry_id < flushed_entry_id;
1244                let is_same_entry_with_newer_sequence = truncated_entry_id == flushed_entry_id
1245                    && edit.flushed_sequence.is_some_and(|flushed_sequence| {
1246                        manifest.flushed_sequence < flushed_sequence
1247                    });
1248
1249                ensure!(
1250                    is_newer_entry || is_same_entry_with_newer_sequence,
1251                    RegionTruncatedSnafu {
1252                        region_id: manifest.metadata.region_id,
1253                    }
1254                );
1255            }
1256
1257            // This is an edit from compaction.
1258            if !edit.files_to_remove.is_empty() {
1259                // Input files of the compaction task has been truncated.
1260                for file in &edit.files_to_remove {
1261                    ensure!(
1262                        manifest.files.contains_key(&file.file_id),
1263                        RegionTruncatedSnafu {
1264                            region_id: manifest.metadata.region_id,
1265                        }
1266                    );
1267                }
1268            }
1269        }
1270
1271        // `update_locked` returns a `PendingManifestHook` we fire after releasing the lock.
1272        let region_id = manifest.metadata.region_id;
1273        let pending = self
1274            .update_locked(&mut manager, action_list, is_staging)
1275            .await?;
1276        let version = pending.version();
1277
1278        // Drop the write lock before invoking the hook. Hook implementations may
1279        // read the manifest or send region requests that acquire this lock;
1280        // holding it would deadlock. Slow hooks also must not block manifest updates.
1281        drop(manager);
1282
1283        if self.state.load() == RegionRoleState::Follower {
1284            warn!(
1285                "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
1286                region_id
1287            );
1288        }
1289
1290        pending.fire().await;
1291
1292        Ok(version)
1293    }
1294
1295    /// Sets the [`RegionRole`].
1296    ///
1297    /// ```text
1298    ///                  +---------------------+
1299    ///                  |   Staging Leader    |
1300    ///                  +----------+----------+
1301    ///                             |
1302    ///                             v
1303    ///     +----------+     +------+-------+     +-------------+
1304    ///     | Follower | <-> |    Leader    | <-> | Downgrading |
1305    ///     +-----+----+     +------+-------+     +------+------+
1306    ///           ^                 ^                    |
1307    ///           +-----------------+--------------------+
1308    ///
1309    /// ```
1310    ///
1311    /// # State Transitions
1312    ///
1313    /// From `Follower`:
1314    /// - `Follower -> Leader`
1315    ///
1316    /// From `Leader`:
1317    /// - `Leader -> Follower`
1318    /// - `Leader -> Downgrading Leader`
1319    ///
1320    /// From `Staging Leader`:
1321    /// - `Staging Leader -> Leader`
1322    /// - `Staging Leader -> Follower`
1323    /// - `Staging Leader -> Downgrading Leader`
1324    ///
1325    /// From `Downgrading Leader`:
1326    /// - `Downgrading Leader -> Leader`
1327    /// - `Downgrading Leader -> Follower`
1328    pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
1329        match next_role {
1330            RegionRole::Follower => {
1331                if self
1332                    .exit_staging(region_id, RegionRoleState::Follower)
1333                    .is_ok()
1334                {
1335                    info!(
1336                        "Convert region {} to follower, previous role state: {:?}",
1337                        region_id,
1338                        RegionRoleState::Leader(RegionLeaderState::Staging)
1339                    );
1340                    return;
1341                }
1342                match self.state.fetch_update(|state| {
1343                    if !matches!(state, RegionRoleState::Follower) {
1344                        Some(RegionRoleState::Follower)
1345                    } else {
1346                        None
1347                    }
1348                }) {
1349                    Ok(state) => info!(
1350                        "Convert region {} to follower, previous role state: {:?}",
1351                        region_id, state
1352                    ),
1353                    Err(state) => {
1354                        if state != RegionRoleState::Follower {
1355                            warn!(
1356                                "Failed to convert region {} to follower, current role state: {:?}",
1357                                region_id, state
1358                            )
1359                        }
1360                    }
1361                }
1362            }
1363            RegionRole::Leader => {
1364                if self
1365                    .exit_staging(
1366                        region_id,
1367                        RegionRoleState::Leader(RegionLeaderState::Writable),
1368                    )
1369                    .is_ok()
1370                {
1371                    info!(
1372                        "Convert region {} to leader, previous role state: {:?}",
1373                        region_id,
1374                        RegionRoleState::Leader(RegionLeaderState::Staging)
1375                    );
1376                    return;
1377                }
1378                match self.state.fetch_update(|state| {
1379                    if matches!(
1380                        state,
1381                        RegionRoleState::Follower
1382                            | RegionRoleState::Leader(RegionLeaderState::Downgrading)
1383                    ) {
1384                        Some(RegionRoleState::Leader(RegionLeaderState::Writable))
1385                    } else {
1386                        None
1387                    }
1388                }) {
1389                    Ok(state) => info!(
1390                        "Convert region {} to leader, previous role state: {:?}",
1391                        region_id, state
1392                    ),
1393                    Err(state) => {
1394                        if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
1395                            warn!(
1396                                "Failed to convert region {} to leader, current role state: {:?}",
1397                                region_id, state
1398                            )
1399                        }
1400                    }
1401                }
1402            }
1403            RegionRole::StagingLeader => {
1404                info!(
1405                    "Ignore direct conversion of region {} to staging leader; staging requires the dedicated workflow",
1406                    region_id
1407                );
1408            }
1409            RegionRole::DowngradingLeader => {
1410                if self
1411                    .exit_staging(
1412                        region_id,
1413                        RegionRoleState::Leader(RegionLeaderState::Downgrading),
1414                    )
1415                    .is_ok()
1416                {
1417                    info!(
1418                        "Convert region {} to downgrading region, previous role state: {:?}",
1419                        region_id,
1420                        RegionRoleState::Leader(RegionLeaderState::Staging)
1421                    );
1422                    return;
1423                }
1424                match self.state.compare_exchange(
1425                    RegionRoleState::Leader(RegionLeaderState::Writable),
1426                    RegionRoleState::Leader(RegionLeaderState::Downgrading),
1427                ) {
1428                    Ok(state) => info!(
1429                        "Convert region {} to downgrading region, previous role state: {:?}",
1430                        region_id, state
1431                    ),
1432                    Err(state) => {
1433                        if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
1434                            warn!(
1435                                "Failed to convert region {} to downgrading leader, current role state: {:?}",
1436                                region_id, state
1437                            )
1438                        }
1439                    }
1440                }
1441            }
1442        }
1443    }
1444
1445    /// Returns the normal manifest of the region.
1446    pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
1447        self.manifest_manager.read().await.manifest()
1448    }
1449
1450    /// Returns the staging manifest of the region.
1451    pub(crate) async fn staging_manifest(
1452        &self,
1453    ) -> Option<Arc<crate::manifest::action::RegionManifest>> {
1454        self.manifest_manager.read().await.staging_manifest()
1455    }
1456}
1457
1458pub(crate) type ManifestContextRef = Arc<ManifestContext>;
1459
1460/// Regions indexed by ids.
1461#[derive(Debug, Default)]
1462pub(crate) struct RegionMap {
1463    regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
1464}
1465
1466impl RegionMap {
1467    /// Returns true if the region exists.
1468    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1469        let regions = self.regions.read().unwrap();
1470        regions.contains_key(&region_id)
1471    }
1472
1473    /// Inserts a new region into the map.
1474    pub(crate) fn insert_region(&self, region: MitoRegionRef) {
1475        let mut regions = self.regions.write().unwrap();
1476        regions.insert(region.region_id, region);
1477    }
1478
1479    /// Gets region by region id.
1480    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1481        let regions = self.regions.read().unwrap();
1482        regions.get(&region_id).cloned()
1483    }
1484
1485    /// Gets writable region by region id.
1486    ///
1487    /// Returns error if the region does not exist or is readonly.
1488    pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1489        let region = self
1490            .get_region(region_id)
1491            .context(RegionNotFoundSnafu { region_id })?;
1492        ensure!(
1493            region.is_writable(),
1494            RegionStateSnafu {
1495                region_id,
1496                state: region.state(),
1497                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1498            }
1499        );
1500        Ok(region)
1501    }
1502
1503    /// Gets readonly region by region id.
1504    ///
1505    /// Returns error if the region does not exist or is writable.
1506    pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1507        let region = self
1508            .get_region(region_id)
1509            .context(RegionNotFoundSnafu { region_id })?;
1510        ensure!(
1511            region.is_follower(),
1512            RegionStateSnafu {
1513                region_id,
1514                state: region.state(),
1515                expect: RegionRoleState::Follower,
1516            }
1517        );
1518
1519        Ok(region)
1520    }
1521
1522    /// Gets region by region id.
1523    ///
1524    /// Calls the callback if the region does not exist.
1525    pub(crate) fn get_region_or<F: OnFailure>(
1526        &self,
1527        region_id: RegionId,
1528        cb: &mut F,
1529    ) -> Option<MitoRegionRef> {
1530        match self
1531            .get_region(region_id)
1532            .context(RegionNotFoundSnafu { region_id })
1533        {
1534            Ok(region) => Some(region),
1535            Err(e) => {
1536                cb.on_failure(e);
1537                None
1538            }
1539        }
1540    }
1541
1542    /// Gets writable region by region id.
1543    ///
1544    /// Calls the callback if the region does not exist or is readonly.
1545    pub(crate) fn writable_region_or<F: OnFailure>(
1546        &self,
1547        region_id: RegionId,
1548        cb: &mut F,
1549    ) -> Option<MitoRegionRef> {
1550        match self.writable_region(region_id) {
1551            Ok(region) => Some(region),
1552            Err(e) => {
1553                cb.on_failure(e);
1554                None
1555            }
1556        }
1557    }
1558
1559    /// Gets writable non-staging region by region id.
1560    ///
1561    /// Returns error if the region does not exist, is readonly, or is in staging mode.
1562    pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1563        let region = self.writable_region(region_id)?;
1564        if region.is_staging() {
1565            return Err(crate::error::RegionStateSnafu {
1566                region_id,
1567                state: region.state(),
1568                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1569            }
1570            .build());
1571        }
1572        Ok(region)
1573    }
1574
1575    /// Gets staging region by region id.
1576    ///
1577    /// Returns error if the region does not exist or is not in staging state.
1578    pub(crate) fn staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1579        let region = self
1580            .get_region(region_id)
1581            .context(RegionNotFoundSnafu { region_id })?;
1582        ensure!(
1583            region.is_staging(),
1584            RegionStateSnafu {
1585                region_id,
1586                state: region.state(),
1587                expect: RegionRoleState::Leader(RegionLeaderState::Staging),
1588            }
1589        );
1590        Ok(region)
1591    }
1592
1593    /// Gets flushable region by region id.
1594    ///
1595    /// Returns error if the region does not exist or not flushable.
1596    pub(crate) fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1597        let region = self
1598            .get_region(region_id)
1599            .context(RegionNotFoundSnafu { region_id })?;
1600        ensure!(
1601            region.is_flushable(),
1602            FlushableRegionStateSnafu {
1603                region_id,
1604                state: region.state(),
1605            }
1606        );
1607        Ok(region)
1608    }
1609
1610    /// Remove region by id.
1611    pub(crate) fn remove_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1612        let mut regions = self.regions.write().unwrap();
1613        regions.remove(&region_id)
1614    }
1615
1616    /// List all regions.
1617    pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1618        let regions = self.regions.read().unwrap();
1619        regions.values().cloned().collect()
1620    }
1621
1622    /// Clear the map.
1623    pub(crate) fn clear(&self) {
1624        self.regions.write().unwrap().clear();
1625    }
1626}
1627
1628pub(crate) type RegionMapRef = Arc<RegionMap>;
1629
1630/// Opening regions
1631#[derive(Debug, Default)]
1632pub(crate) struct OpeningRegions {
1633    regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1634}
1635
1636impl OpeningRegions {
1637    /// Registers `sender` for an opening region; Otherwise, it returns `None`.
1638    pub(crate) fn wait_for_opening_region(
1639        &self,
1640        region_id: RegionId,
1641        sender: OptionOutputTx,
1642    ) -> Option<OptionOutputTx> {
1643        let mut regions = self.regions.write().unwrap();
1644        match regions.entry(region_id) {
1645            Entry::Occupied(mut senders) => {
1646                senders.get_mut().push(sender);
1647                None
1648            }
1649            Entry::Vacant(_) => Some(sender),
1650        }
1651    }
1652
1653    /// Returns true if the region exists.
1654    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1655        let regions = self.regions.read().unwrap();
1656        regions.contains_key(&region_id)
1657    }
1658
1659    /// Inserts a new region into the map.
1660    pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1661        let mut regions = self.regions.write().unwrap();
1662        regions.insert(region, vec![sender]);
1663    }
1664
1665    /// Remove region by id.
1666    pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1667        let mut regions = self.regions.write().unwrap();
1668        regions.remove(&region_id).unwrap_or_default()
1669    }
1670
1671    #[cfg(test)]
1672    pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1673        let regions = self.regions.read().unwrap();
1674        if let Some(senders) = regions.get(&region_id) {
1675            senders.len()
1676        } else {
1677            0
1678        }
1679    }
1680}
1681
1682pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1683
1684/// The regions that are catching up.
1685#[derive(Debug, Default)]
1686pub(crate) struct CatchupRegions {
1687    regions: RwLock<HashSet<RegionId>>,
1688}
1689
1690impl CatchupRegions {
1691    /// Returns true if the region exists.
1692    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1693        let regions = self.regions.read().unwrap();
1694        regions.contains(&region_id)
1695    }
1696
1697    /// Inserts a new region into the set.
1698    pub(crate) fn insert_region(&self, region_id: RegionId) {
1699        let mut regions = self.regions.write().unwrap();
1700        regions.insert(region_id);
1701    }
1702
1703    /// Remove region by id.
1704    pub(crate) fn remove_region(&self, region_id: RegionId) {
1705        let mut regions = self.regions.write().unwrap();
1706        regions.remove(&region_id);
1707    }
1708}
1709
1710pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1711
1712/// Manifest stats.
1713#[derive(Default, Debug, Clone)]
1714pub struct ManifestStats {
1715    pub(crate) total_manifest_size: Arc<AtomicU64>,
1716    pub(crate) manifest_version: Arc<AtomicU64>,
1717    pub(crate) file_removed_cnt: Arc<AtomicU64>,
1718}
1719
1720impl ManifestStats {
1721    fn total_manifest_size(&self) -> u64 {
1722        self.total_manifest_size.load(Ordering::Relaxed)
1723    }
1724
1725    fn manifest_version(&self) -> u64 {
1726        self.manifest_version.load(Ordering::Relaxed)
1727    }
1728
1729    fn file_removed_cnt(&self) -> u64 {
1730        self.file_removed_cnt.load(Ordering::Relaxed)
1731    }
1732}
1733
1734/// Parses the partition expression from a JSON string.
1735pub fn parse_partition_expr(partition_expr_str: Option<&str>) -> Result<Option<PartitionExpr>> {
1736    match partition_expr_str {
1737        None => Ok(None),
1738        Some("") => Ok(None),
1739        Some(json_str) => {
1740            let expr = partition::expr::PartitionExpr::from_json_str(json_str)
1741                .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?;
1742            Ok(expr)
1743        }
1744    }
1745}
1746
1747#[cfg(test)]
1748mod tests {
1749    use std::sync::Arc;
1750    use std::sync::atomic::AtomicU64;
1751
1752    use common_datasource::compression::CompressionType;
1753    use common_test_util::temp_dir::create_temp_dir;
1754    use crossbeam_utils::atomic::AtomicCell;
1755    use object_store::ObjectStore;
1756    use object_store::services::Fs;
1757    use store_api::logstore::provider::Provider;
1758    use store_api::region_engine::RegionRole;
1759    use store_api::region_request::PathType;
1760    use store_api::storage::{FileId, RegionId};
1761
1762    use crate::access_layer::AccessLayer;
1763    use crate::error::Error;
1764    use crate::manifest::action::{
1765        RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionPartitionExprChange,
1766    };
1767    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1768    use crate::region::{
1769        ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1770    };
1771    use crate::sst::FormatType;
1772    use crate::sst::index::intermediate::IntermediateManager;
1773    use crate::sst::index::puffin_manager::PuffinManagerFactory;
1774    use crate::test_util::scheduler_util::SchedulerEnv;
1775    use crate::test_util::version_util::VersionControlBuilder;
1776    use crate::time_provider::StdTimeProvider;
1777
1778    #[test]
1779    fn test_region_state_lock_free() {
1780        assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1781    }
1782
1783    #[test]
1784    fn test_region_role_state_as_str() {
1785        assert_eq!("Follower", RegionRoleState::Follower.as_str());
1786        assert_eq!(
1787            "Leader(Writable)",
1788            RegionRoleState::Leader(RegionLeaderState::Writable).as_str()
1789        );
1790        assert_eq!(
1791            "Leader(Staging)",
1792            RegionRoleState::Leader(RegionLeaderState::Staging).as_str()
1793        );
1794        assert_eq!(
1795            "Leader(Downgrading)",
1796            RegionRoleState::Leader(RegionLeaderState::Downgrading).as_str()
1797        );
1798    }
1799
1800    async fn build_test_region(env: &SchedulerEnv) -> MitoRegion {
1801        let builder = VersionControlBuilder::new();
1802        let version_control = Arc::new(builder.build());
1803        let metadata = version_control.current().version.metadata.clone();
1804
1805        let manager = RegionManifestManager::new(
1806            metadata.clone(),
1807            0,
1808            RegionManifestOptions {
1809                manifest_dir: "".to_string(),
1810                object_store: env.access_layer.object_store().clone(),
1811                compress_type: CompressionType::Uncompressed,
1812                checkpoint_distance: 10,
1813                remove_file_options: Default::default(),
1814                manifest_cache: None,
1815            },
1816            FormatType::PrimaryKey,
1817            &Default::default(),
1818        )
1819        .await
1820        .unwrap();
1821
1822        let manifest_ctx = Arc::new(ManifestContext::new(
1823            manager,
1824            RegionRoleState::Leader(RegionLeaderState::Writable),
1825            None,
1826        ));
1827
1828        MitoRegion {
1829            region_id: metadata.region_id,
1830            version_control,
1831            access_layer: env.access_layer.clone(),
1832            manifest_ctx,
1833            file_purger: crate::test_util::new_noop_file_purger(),
1834            provider: Provider::noop_provider(),
1835            last_flush_millis: Default::default(),
1836            last_schedule_compaction_millis: Default::default(),
1837            time_provider: Arc::new(StdTimeProvider),
1838            topic_latest_entry_id: Default::default(),
1839            written_bytes: Arc::new(AtomicU64::new(0)),
1840            stats: ManifestStats::default(),
1841        }
1842    }
1843
1844    fn empty_edit() -> RegionEdit {
1845        RegionEdit {
1846            files_to_add: Vec::new(),
1847            files_to_remove: Vec::new(),
1848            timestamp_ms: None,
1849            compaction_time_window: None,
1850            flushed_entry_id: None,
1851            flushed_sequence: None,
1852            committed_sequence: None,
1853        }
1854    }
1855
1856    #[tokio::test]
1857    async fn test_compaction_update_manifest_allows_editing_state() {
1858        let env = SchedulerEnv::new().await;
1859        let region = build_test_region(&env).await;
1860        region.set_editing(RegionLeaderState::Writable).unwrap();
1861
1862        let file_id = FileId::random();
1863        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(RegionEdit {
1864            files_to_add: vec![crate::sst::file::FileMeta {
1865                region_id: region.region_id,
1866                file_id,
1867                level: 1,
1868                ..Default::default()
1869            }],
1870            files_to_remove: Vec::new(),
1871            timestamp_ms: None,
1872            compaction_time_window: None,
1873            flushed_entry_id: None,
1874            flushed_sequence: None,
1875            committed_sequence: None,
1876        }));
1877
1878        region
1879            .manifest_ctx
1880            .update_manifest_for_compaction(action_list)
1881            .await
1882            .unwrap();
1883
1884        assert!(
1885            region
1886                .manifest_ctx
1887                .manifest()
1888                .await
1889                .files
1890                .contains_key(&file_id)
1891        );
1892    }
1893
1894    #[tokio::test]
1895    async fn test_exit_staging_partition_expr_change_and_edit_success() {
1896        let env = SchedulerEnv::new().await;
1897        let region = build_test_region(&env).await;
1898
1899        let mut manager = region.manifest_ctx.manifest_manager.write().await;
1900        region.set_staging(&mut manager).await.unwrap();
1901        manager
1902            .update(
1903                RegionMetaActionList::new(vec![
1904                    RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
1905                        partition_expr: Some("expr_a".to_string()),
1906                    }),
1907                    RegionMetaAction::Edit(empty_edit()),
1908                ]),
1909                true,
1910            )
1911            .await
1912            .unwrap();
1913
1914        let _hook_payload = region.exit_staging_on_success(&mut manager).await.unwrap();
1915        drop(manager);
1916
1917        assert_eq!(
1918            region.version().metadata.partition_expr.as_deref(),
1919            Some("expr_a")
1920        );
1921        assert_eq!(
1922            region.state(),
1923            RegionRoleState::Leader(RegionLeaderState::Writable)
1924        );
1925    }
1926
1927    #[tokio::test]
1928    async fn test_exit_staging_change_with_same_columns_success() {
1929        let env = SchedulerEnv::new().await;
1930        let region = build_test_region(&env).await;
1931
1932        let mut manager = region.manifest_ctx.manifest_manager.write().await;
1933        region.set_staging(&mut manager).await.unwrap();
1934
1935        let mut changed_metadata = region.version().metadata.as_ref().clone();
1936        changed_metadata.set_partition_expr(Some("expr_b".to_string()));
1937
1938        manager
1939            .update(
1940                RegionMetaActionList::new(vec![
1941                    RegionMetaAction::Change(RegionChange {
1942                        metadata: Arc::new(changed_metadata),
1943                        sst_format: FormatType::PrimaryKey,
1944                        append_mode: None,
1945                    }),
1946                    RegionMetaAction::Edit(empty_edit()),
1947                ]),
1948                true,
1949            )
1950            .await
1951            .unwrap();
1952
1953        let _hook_payload = region.exit_staging_on_success(&mut manager).await.unwrap();
1954        drop(manager);
1955
1956        assert_eq!(
1957            region.version().metadata.partition_expr.as_deref(),
1958            Some("expr_b")
1959        );
1960        assert_eq!(
1961            region.state(),
1962            RegionRoleState::Leader(RegionLeaderState::Writable)
1963        );
1964    }
1965
1966    #[tokio::test]
1967    async fn test_exit_staging_change_with_different_columns_fails() {
1968        let env = SchedulerEnv::new().await;
1969        let region = build_test_region(&env).await;
1970
1971        let mut manager = region.manifest_ctx.manifest_manager.write().await;
1972        region.set_staging(&mut manager).await.unwrap();
1973
1974        let mut changed_metadata = region.version().metadata.as_ref().clone();
1975        changed_metadata.column_metadatas.rotate_left(1);
1976
1977        manager
1978            .update(
1979                RegionMetaActionList::new(vec![
1980                    RegionMetaAction::Change(RegionChange {
1981                        metadata: Arc::new(changed_metadata),
1982                        sst_format: FormatType::PrimaryKey,
1983                        append_mode: None,
1984                    }),
1985                    RegionMetaAction::Edit(empty_edit()),
1986                ]),
1987                true,
1988            )
1989            .await
1990            .unwrap();
1991
1992        let result = region.exit_staging_on_success(&mut manager).await;
1993        assert!(matches!(result, Err(Error::Unexpected { .. })));
1994    }
1995
1996    #[tokio::test]
1997    async fn test_exit_staging_partition_expr_change_and_change_conflict_fails() {
1998        let env = SchedulerEnv::new().await;
1999        let region = build_test_region(&env).await;
2000
2001        let mut manager = region.manifest_ctx.manifest_manager.write().await;
2002        region.set_staging(&mut manager).await.unwrap();
2003
2004        let mut changed_metadata = region.version().metadata.as_ref().clone();
2005        changed_metadata.set_partition_expr(Some("expr_c".to_string()));
2006
2007        manager
2008            .update(
2009                RegionMetaActionList::new(vec![
2010                    RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
2011                        partition_expr: Some("expr_c".to_string()),
2012                    }),
2013                    RegionMetaAction::Change(RegionChange {
2014                        metadata: Arc::new(changed_metadata),
2015                        sst_format: FormatType::PrimaryKey,
2016                        append_mode: None,
2017                    }),
2018                    RegionMetaAction::Edit(empty_edit()),
2019                ]),
2020                true,
2021            )
2022            .await
2023            .unwrap();
2024
2025        let result = region.exit_staging_on_success(&mut manager).await;
2026        assert!(matches!(result, Err(Error::Unexpected { .. })));
2027    }
2028
2029    #[tokio::test]
2030    async fn test_set_region_state() {
2031        let env = SchedulerEnv::new().await;
2032        let builder = VersionControlBuilder::new();
2033        let version_control = Arc::new(builder.build());
2034        let manifest_ctx = env
2035            .mock_manifest_context(version_control.current().version.metadata.clone())
2036            .await;
2037
2038        let region_id = RegionId::new(1024, 0);
2039        // Leader -> Follower
2040        manifest_ctx.set_role(RegionRole::Follower, region_id);
2041        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
2042
2043        // Follower -> Leader
2044        manifest_ctx.set_role(RegionRole::Leader, region_id);
2045        assert_eq!(
2046            manifest_ctx.state.load(),
2047            RegionRoleState::Leader(RegionLeaderState::Writable)
2048        );
2049
2050        // Direct Leader -> StagingLeader should be ignored.
2051        manifest_ctx.set_role(RegionRole::StagingLeader, region_id);
2052        assert_eq!(
2053            manifest_ctx.state.load(),
2054            RegionRoleState::Leader(RegionLeaderState::Writable)
2055        );
2056
2057        // Leader -> Downgrading Leader
2058        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
2059        assert_eq!(
2060            manifest_ctx.state.load(),
2061            RegionRoleState::Leader(RegionLeaderState::Downgrading)
2062        );
2063
2064        // Downgrading Leader -> Follower
2065        manifest_ctx.set_role(RegionRole::Follower, region_id);
2066        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
2067
2068        // Can't downgrade from follower (Follower -> Downgrading Leader)
2069        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
2070        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
2071
2072        // Set region role too Downgrading Leader
2073        manifest_ctx.set_role(RegionRole::Leader, region_id);
2074        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
2075        assert_eq!(
2076            manifest_ctx.state.load(),
2077            RegionRoleState::Leader(RegionLeaderState::Downgrading)
2078        );
2079
2080        // Downgrading Leader -> Leader
2081        manifest_ctx.set_role(RegionRole::Leader, region_id);
2082        assert_eq!(
2083            manifest_ctx.state.load(),
2084            RegionRoleState::Leader(RegionLeaderState::Writable)
2085        );
2086    }
2087
2088    #[tokio::test]
2089    async fn test_staging_state_validation() {
2090        let env = SchedulerEnv::new().await;
2091        let builder = VersionControlBuilder::new();
2092        let version_control = Arc::new(builder.build());
2093
2094        // Create context with staging state using the correct pattern from SchedulerEnv
2095        let staging_ctx = {
2096            let manager = RegionManifestManager::new(
2097                version_control.current().version.metadata.clone(),
2098                0,
2099                RegionManifestOptions {
2100                    manifest_dir: "".to_string(),
2101                    object_store: env.access_layer.object_store().clone(),
2102                    compress_type: CompressionType::Uncompressed,
2103                    checkpoint_distance: 10,
2104                    remove_file_options: Default::default(),
2105                    manifest_cache: None,
2106                },
2107                FormatType::PrimaryKey,
2108                &Default::default(),
2109            )
2110            .await
2111            .unwrap();
2112            Arc::new(ManifestContext::new(
2113                manager,
2114                RegionRoleState::Leader(RegionLeaderState::Staging),
2115                None,
2116            ))
2117        };
2118
2119        // Test staging state behavior
2120        assert_eq!(
2121            staging_ctx.current_state(),
2122            RegionRoleState::Leader(RegionLeaderState::Staging)
2123        );
2124
2125        // Test writable context for comparison
2126        let writable_ctx = env
2127            .mock_manifest_context(version_control.current().version.metadata.clone())
2128            .await;
2129
2130        assert_eq!(
2131            writable_ctx.current_state(),
2132            RegionRoleState::Leader(RegionLeaderState::Writable)
2133        );
2134    }
2135
2136    #[tokio::test]
2137    async fn test_staging_state_transitions() {
2138        let builder = VersionControlBuilder::new();
2139        let version_control = Arc::new(builder.build());
2140        let metadata = version_control.current().version.metadata.clone();
2141
2142        // Create MitoRegion for testing state transitions
2143        let temp_dir = create_temp_dir("");
2144        let path_str = temp_dir.path().display().to_string();
2145        let fs_builder = Fs::default().root(&path_str);
2146        let object_store = ObjectStore::new(fs_builder).unwrap().finish();
2147
2148        let index_aux_path = temp_dir.path().join("index_aux");
2149        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
2150            .await
2151            .unwrap();
2152        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
2153            .await
2154            .unwrap();
2155
2156        let access_layer = Arc::new(AccessLayer::new(
2157            "",
2158            PathType::Bare,
2159            object_store,
2160            puffin_mgr,
2161            intm_mgr,
2162        ));
2163
2164        let manager = RegionManifestManager::new(
2165            metadata.clone(),
2166            0,
2167            RegionManifestOptions {
2168                manifest_dir: "".to_string(),
2169                object_store: access_layer.object_store().clone(),
2170                compress_type: CompressionType::Uncompressed,
2171                checkpoint_distance: 10,
2172                remove_file_options: Default::default(),
2173                manifest_cache: None,
2174            },
2175            FormatType::PrimaryKey,
2176            &Default::default(),
2177        )
2178        .await
2179        .unwrap();
2180
2181        let manifest_ctx = Arc::new(ManifestContext::new(
2182            manager,
2183            RegionRoleState::Leader(RegionLeaderState::Writable),
2184            None,
2185        ));
2186
2187        let region = MitoRegion {
2188            region_id: metadata.region_id,
2189            version_control,
2190            access_layer,
2191            manifest_ctx: manifest_ctx.clone(),
2192            file_purger: crate::test_util::new_noop_file_purger(),
2193            provider: Provider::noop_provider(),
2194            last_flush_millis: Default::default(),
2195            last_schedule_compaction_millis: Default::default(),
2196            time_provider: Arc::new(StdTimeProvider),
2197            topic_latest_entry_id: Default::default(),
2198            written_bytes: Arc::new(AtomicU64::new(0)),
2199            stats: ManifestStats::default(),
2200        };
2201
2202        // Test initial state
2203        assert_eq!(
2204            region.state(),
2205            RegionRoleState::Leader(RegionLeaderState::Writable)
2206        );
2207        assert!(!region.is_staging());
2208
2209        // Test transition to staging
2210        let mut manager = manifest_ctx.manifest_manager.write().await;
2211        region.set_staging(&mut manager).await.unwrap();
2212        drop(manager);
2213        assert_eq!(
2214            region.state(),
2215            RegionRoleState::Leader(RegionLeaderState::Staging)
2216        );
2217        assert!(region.is_staging());
2218
2219        // Test transition back to writable
2220        region.exit_staging().unwrap();
2221        assert_eq!(
2222            region.state(),
2223            RegionRoleState::Leader(RegionLeaderState::Writable)
2224        );
2225        assert!(!region.is_staging());
2226
2227        // Test staging directory cleanup: Create dirty staging files before entering staging mode
2228        {
2229            // Create some dummy staging manifest files to simulate interrupted session
2230            let manager = manifest_ctx.manifest_manager.write().await;
2231            let dummy_actions = RegionMetaActionList::new(vec![]);
2232            let dummy_bytes = dummy_actions.encode().unwrap();
2233
2234            // Create dirty staging files with versions 100 and 101
2235            manager.store().save(100, &dummy_bytes, true).await.unwrap();
2236            manager.store().save(101, &dummy_bytes, true).await.unwrap();
2237            drop(manager);
2238
2239            // Verify dirty files exist before entering staging
2240            let manager = manifest_ctx.manifest_manager.read().await;
2241            let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
2242            assert_eq!(
2243                dirty_manifests.len(),
2244                2,
2245                "Should have 2 dirty staging files"
2246            );
2247            drop(manager);
2248
2249            // Enter staging mode - this should clean up the dirty files
2250            let mut manager = manifest_ctx.manifest_manager.write().await;
2251            region.set_staging(&mut manager).await.unwrap();
2252            drop(manager);
2253
2254            // Verify dirty files are cleaned up after entering staging
2255            let manager = manifest_ctx.manifest_manager.read().await;
2256            let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
2257            assert_eq!(
2258                cleaned_manifests.len(),
2259                0,
2260                "Dirty staging files should be cleaned up"
2261            );
2262            drop(manager);
2263
2264            // Exit staging to restore normal state for remaining tests
2265            region.exit_staging().unwrap();
2266        }
2267
2268        // Test invalid transitions
2269        let mut manager = manifest_ctx.manifest_manager.write().await;
2270        assert!(region.set_staging(&mut manager).await.is_ok()); // Writable -> Staging should work
2271        drop(manager);
2272        let mut manager = manifest_ctx.manifest_manager.write().await;
2273        assert!(region.set_staging(&mut manager).await.is_err()); // Staging -> Staging should fail
2274        drop(manager);
2275        assert!(region.exit_staging().is_ok()); // Staging -> Writable should work
2276        assert!(region.exit_staging().is_err()); // Writable -> Writable should fail
2277    }
2278}