mito2/
region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region.
16
17pub mod catchup;
18pub mod opener;
19pub mod options;
20pub mod utils;
21pub(crate) mod version;
22
23use std::collections::hash_map::Entry;
24use std::collections::{HashMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27
28use common_telemetry::{error, info, warn};
29use crossbeam_utils::atomic::AtomicCell;
30use partition::expr::PartitionExpr;
31use snafu::{OptionExt, ResultExt, ensure};
32use store_api::ManifestVersion;
33use store_api::codec::PrimaryKeyEncoding;
34use store_api::logstore::provider::Provider;
35use store_api::metadata::RegionMetadataRef;
36use store_api::region_engine::{
37    RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
38};
39use store_api::region_request::PathType;
40use store_api::sst_entry::ManifestSstEntry;
41use store_api::storage::{FileId, RegionId, SequenceNumber};
42use tokio::sync::RwLockWriteGuard;
43pub use utils::*;
44
45use crate::access_layer::AccessLayerRef;
46use crate::error::{
47    InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
48    UnexpectedSnafu, UpdateManifestSnafu,
49};
50use crate::manifest::action::{
51    RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
52};
53use crate::manifest::manager::RegionManifestManager;
54use crate::region::version::{VersionControlRef, VersionRef};
55use crate::request::{OnFailure, OptionOutputTx};
56use crate::sst::file::FileMeta;
57use crate::sst::file_purger::FilePurgerRef;
58use crate::sst::location::{index_file_path, sst_file_path};
59use crate::time_provider::TimeProviderRef;
60
61/// This is the approximate factor to estimate the size of wal.
62const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
63
64/// Region status include region id, memtable usage, sst usage, wal usage and manifest usage.
65#[derive(Debug)]
66pub struct RegionUsage {
67    pub region_id: RegionId,
68    pub wal_usage: u64,
69    pub sst_usage: u64,
70    pub manifest_usage: u64,
71}
72
73impl RegionUsage {
74    pub fn disk_usage(&self) -> u64 {
75        self.wal_usage + self.sst_usage + self.manifest_usage
76    }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum RegionLeaderState {
81    /// The region is opened and is writable.
82    Writable,
83    /// The region is in staging mode - writable but no checkpoint/compaction.
84    Staging,
85    /// The region is entering staging mode. - write requests will be stalled.
86    EnteringStaging,
87    /// The region is altering.
88    Altering,
89    /// The region is dropping.
90    Dropping,
91    /// The region is truncating.
92    Truncating,
93    /// The region is handling a region edit.
94    Editing,
95    /// The region is stepping down.
96    Downgrading,
97}
98
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100pub enum RegionRoleState {
101    Leader(RegionLeaderState),
102    Follower,
103}
104
105impl RegionRoleState {
106    /// Converts the region role state to leader state if it is a leader state.
107    pub fn into_leader_state(self) -> Option<RegionLeaderState> {
108        match self {
109            RegionRoleState::Leader(leader_state) => Some(leader_state),
110            RegionRoleState::Follower => None,
111        }
112    }
113}
114
115/// Metadata and runtime status of a region.
116///
117/// Writing and reading a region follow a single-writer-multi-reader rule:
118/// - Only the region worker thread this region belongs to can modify the metadata.
119/// - Multiple reader threads are allowed to read a specific `version` of a region.
120#[derive(Debug)]
121pub struct MitoRegion {
122    /// Id of this region.
123    ///
124    /// Accessing region id from the version control is inconvenient so
125    /// we also store it here.
126    pub(crate) region_id: RegionId,
127
128    /// Version controller for this region.
129    ///
130    /// We MUST update the version control inside the write lock of the region manifest manager.
131    pub(crate) version_control: VersionControlRef,
132    /// SSTs accessor for this region.
133    pub(crate) access_layer: AccessLayerRef,
134    /// Context to maintain manifest for this region.
135    pub(crate) manifest_ctx: ManifestContextRef,
136    /// SST file purger.
137    pub(crate) file_purger: FilePurgerRef,
138    /// The provider of log store.
139    pub(crate) provider: Provider,
140    /// Last flush time in millis.
141    last_flush_millis: AtomicI64,
142    /// Last compaction time in millis.
143    last_compaction_millis: AtomicI64,
144    /// Provider to get current time.
145    time_provider: TimeProviderRef,
146    /// The topic's latest entry id since the region's last flushing.
147    /// **Only used for remote WAL pruning.**
148    ///
149    /// The value will be updated to the latest offset of the topic
150    /// if region receives a flush request or schedules a periodic flush task
151    /// and the region's memtable is empty.
152    ///
153    /// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region,
154    /// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`.
155    pub(crate) topic_latest_entry_id: AtomicU64,
156    /// The total bytes written to the region.
157    pub(crate) written_bytes: Arc<AtomicU64>,
158    /// The partition expression of the region in staging mode.
159    ///
160    /// During the staging mode, the region metadata in [`VersionControlRef`] is not updated,
161    /// so we need to store the partition expression separately.
162    /// TODO(weny):
163    /// 1. Reload the staging partition expr during region open.
164    /// 2. Rejects requests with mismatching partition expr.
165    pub(crate) staging_partition_expr: Mutex<Option<String>>,
166    /// manifest stats
167    stats: ManifestStats,
168}
169
170pub type MitoRegionRef = Arc<MitoRegion>;
171
172impl MitoRegion {
173    /// Stop background managers for this region.
174    pub(crate) async fn stop(&self) {
175        self.manifest_ctx
176            .manifest_manager
177            .write()
178            .await
179            .stop()
180            .await;
181
182        info!(
183            "Stopped region manifest manager, region_id: {}",
184            self.region_id
185        );
186    }
187
188    /// Returns current metadata of the region.
189    pub fn metadata(&self) -> RegionMetadataRef {
190        let version_data = self.version_control.current();
191        version_data.version.metadata.clone()
192    }
193
194    /// Returns primary key encoding of the region.
195    pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
196        let version_data = self.version_control.current();
197        version_data.version.metadata.primary_key_encoding
198    }
199
200    /// Returns current version of the region.
201    pub(crate) fn version(&self) -> VersionRef {
202        let version_data = self.version_control.current();
203        version_data.version
204    }
205
206    /// Returns last flush timestamp in millis.
207    pub(crate) fn last_flush_millis(&self) -> i64 {
208        self.last_flush_millis.load(Ordering::Relaxed)
209    }
210
211    /// Update flush time to current time.
212    pub(crate) fn update_flush_millis(&self) {
213        let now = self.time_provider.current_time_millis();
214        self.last_flush_millis.store(now, Ordering::Relaxed);
215    }
216
217    /// Returns last compaction timestamp in millis.
218    pub(crate) fn last_compaction_millis(&self) -> i64 {
219        self.last_compaction_millis.load(Ordering::Relaxed)
220    }
221
222    /// Update compaction time to current time.
223    pub(crate) fn update_compaction_millis(&self) {
224        let now = self.time_provider.current_time_millis();
225        self.last_compaction_millis.store(now, Ordering::Relaxed);
226    }
227
228    /// Returns the table dir.
229    pub(crate) fn table_dir(&self) -> &str {
230        self.access_layer.table_dir()
231    }
232
233    /// Returns the path type of the region.
234    pub(crate) fn path_type(&self) -> PathType {
235        self.access_layer.path_type()
236    }
237
238    /// Returns whether the region is writable.
239    pub(crate) fn is_writable(&self) -> bool {
240        matches!(
241            self.manifest_ctx.state.load(),
242            RegionRoleState::Leader(RegionLeaderState::Writable)
243                | RegionRoleState::Leader(RegionLeaderState::Staging)
244        )
245    }
246
247    /// Returns whether the region is flushable.
248    pub(crate) fn is_flushable(&self) -> bool {
249        matches!(
250            self.manifest_ctx.state.load(),
251            RegionRoleState::Leader(RegionLeaderState::Writable)
252                | RegionRoleState::Leader(RegionLeaderState::Staging)
253                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
254        )
255    }
256
257    /// Returns whether the region should abort index building.
258    pub(crate) fn should_abort_index(&self) -> bool {
259        matches!(
260            self.manifest_ctx.state.load(),
261            RegionRoleState::Follower
262                | RegionRoleState::Leader(RegionLeaderState::Dropping)
263                | RegionRoleState::Leader(RegionLeaderState::Truncating)
264                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
265                | RegionRoleState::Leader(RegionLeaderState::Staging)
266        )
267    }
268
269    /// Returns whether the region is downgrading.
270    pub(crate) fn is_downgrading(&self) -> bool {
271        matches!(
272            self.manifest_ctx.state.load(),
273            RegionRoleState::Leader(RegionLeaderState::Downgrading)
274        )
275    }
276
277    /// Returns whether the region is in staging mode.
278    #[allow(dead_code)]
279    pub(crate) fn is_staging(&self) -> bool {
280        self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
281    }
282
283    pub fn region_id(&self) -> RegionId {
284        self.region_id
285    }
286
287    pub fn find_committed_sequence(&self) -> SequenceNumber {
288        self.version_control.committed_sequence()
289    }
290
291    /// Returns whether the region is readonly.
292    pub fn is_follower(&self) -> bool {
293        self.manifest_ctx.state.load() == RegionRoleState::Follower
294    }
295
296    /// Returns the state of the region.
297    pub(crate) fn state(&self) -> RegionRoleState {
298        self.manifest_ctx.state.load()
299    }
300
301    /// Sets the region role state.
302    pub(crate) fn set_role(&self, next_role: RegionRole) {
303        self.manifest_ctx.set_role(next_role, self.region_id);
304    }
305
306    /// Sets the altering state.
307    /// You should call this method in the worker loop.
308    pub(crate) fn set_altering(&self) -> Result<()> {
309        self.compare_exchange_state(
310            RegionLeaderState::Writable,
311            RegionRoleState::Leader(RegionLeaderState::Altering),
312        )
313    }
314
315    /// Sets the dropping state.
316    /// You should call this method in the worker loop.
317    pub(crate) fn set_dropping(&self, expect: RegionLeaderState) -> Result<()> {
318        self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Dropping))
319    }
320
321    /// Sets the truncating state.
322    /// You should call this method in the worker loop.
323    pub(crate) fn set_truncating(&self) -> Result<()> {
324        self.compare_exchange_state(
325            RegionLeaderState::Writable,
326            RegionRoleState::Leader(RegionLeaderState::Truncating),
327        )
328    }
329
330    /// Sets the editing state.
331    /// You should call this method in the worker loop.
332    pub(crate) fn set_editing(&self, expect: RegionLeaderState) -> Result<()> {
333        self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Editing))
334    }
335
336    /// Sets the staging state.
337    ///
338    /// You should call this method in the worker loop.
339    /// Transitions from Writable to Staging state.
340    /// Cleans any existing staging manifests before entering staging mode.
341    pub(crate) async fn set_staging(
342        &self,
343        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
344    ) -> Result<()> {
345        manager.store().clear_staging_manifests().await?;
346
347        self.compare_exchange_state(
348            RegionLeaderState::Writable,
349            RegionRoleState::Leader(RegionLeaderState::Staging),
350        )
351    }
352
353    /// Sets the entering staging state.
354    pub(crate) fn set_entering_staging(&self) -> Result<()> {
355        self.compare_exchange_state(
356            RegionLeaderState::Writable,
357            RegionRoleState::Leader(RegionLeaderState::EnteringStaging),
358        )
359    }
360
361    /// Exits the staging state back to writable.
362    ///
363    /// You should call this method in the worker loop.
364    /// Transitions from Staging to Writable state.
365    pub fn exit_staging(&self) -> Result<()> {
366        *self.staging_partition_expr.lock().unwrap() = None;
367        self.compare_exchange_state(
368            RegionLeaderState::Staging,
369            RegionRoleState::Leader(RegionLeaderState::Writable),
370        )
371    }
372
373    /// Sets the region role state gracefully. This acquires the manifest write lock.
374    pub(crate) async fn set_role_state_gracefully(
375        &self,
376        state: SettableRegionRoleState,
377    ) -> Result<()> {
378        let mut manager: RwLockWriteGuard<'_, RegionManifestManager> =
379            self.manifest_ctx.manifest_manager.write().await;
380        let current_state = self.state();
381
382        match state {
383            SettableRegionRoleState::Leader => {
384                // Exit staging mode and return to normal writable leader
385                // Only allowed from staging state
386                match current_state {
387                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
388                        info!("Exiting staging mode for region {}", self.region_id);
389                        // Use the success exit path that merges all staged manifests
390                        self.exit_staging_on_success(&mut manager).await?;
391                    }
392                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
393                        // Already in desired state - no-op
394                        info!("Region {} already in normal leader mode", self.region_id);
395                    }
396                    _ => {
397                        // Only staging -> leader transition is allowed
398                        return Err(RegionStateSnafu {
399                            region_id: self.region_id,
400                            state: current_state,
401                            expect: RegionRoleState::Leader(RegionLeaderState::Staging),
402                        }
403                        .build());
404                    }
405                }
406            }
407
408            SettableRegionRoleState::StagingLeader => {
409                // Enter staging mode from normal writable leader
410                // Only allowed from writable leader state
411                match current_state {
412                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
413                        info!("Entering staging mode for region {}", self.region_id);
414                        self.set_staging(&mut manager).await?;
415                    }
416                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
417                        // Already in desired state - no-op
418                        info!("Region {} already in staging mode", self.region_id);
419                    }
420                    _ => {
421                        return Err(RegionStateSnafu {
422                            region_id: self.region_id,
423                            state: current_state,
424                            expect: RegionRoleState::Leader(RegionLeaderState::Writable),
425                        }
426                        .build());
427                    }
428                }
429            }
430
431            SettableRegionRoleState::Follower => {
432                // Make this region a follower
433                match current_state {
434                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
435                        info!(
436                            "Exiting staging and demoting region {} to follower",
437                            self.region_id
438                        );
439                        self.exit_staging()?;
440                        self.set_role(RegionRole::Follower);
441                    }
442                    RegionRoleState::Leader(_) => {
443                        info!("Demoting region {} from leader to follower", self.region_id);
444                        self.set_role(RegionRole::Follower);
445                    }
446                    RegionRoleState::Follower => {
447                        // Already in desired state - no-op
448                        info!("Region {} already in follower mode", self.region_id);
449                    }
450                }
451            }
452
453            SettableRegionRoleState::DowngradingLeader => {
454                // downgrade this region to downgrading leader
455                match current_state {
456                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
457                        info!(
458                            "Exiting staging and entering downgrade for region {}",
459                            self.region_id
460                        );
461                        self.exit_staging()?;
462                        self.set_role(RegionRole::DowngradingLeader);
463                    }
464                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
465                        info!("Starting downgrade for region {}", self.region_id);
466                        self.set_role(RegionRole::DowngradingLeader);
467                    }
468                    RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
469                        // Already in desired state - no-op
470                        info!("Region {} already in downgrading mode", self.region_id);
471                    }
472                    _ => {
473                        warn!(
474                            "Cannot start downgrade for region {} from state {:?}",
475                            self.region_id, current_state
476                        );
477                    }
478                }
479            }
480        }
481
482        // Hack(zhongzc): If we have just become leader (writable), persist any backfilled metadata.
483        if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
484            // Persist backfilled metadata if manifest is missing fields (e.g., partition_expr)
485            let manifest_meta = &manager.manifest().metadata;
486            let current_version = self.version();
487            let current_meta = &current_version.metadata;
488            if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
489                let action = RegionMetaAction::Change(RegionChange {
490                    metadata: current_meta.clone(),
491                    sst_format: current_version.options.sst_format.unwrap_or_default(),
492                });
493                let result = manager
494                    .update(RegionMetaActionList::with_action(action), false)
495                    .await;
496
497                match result {
498                    Ok(version) => {
499                        info!(
500                            "Successfully persisted backfilled metadata for region {}, version: {}",
501                            self.region_id, version
502                        );
503                    }
504                    Err(e) => {
505                        warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
506                    }
507                }
508            }
509        }
510
511        drop(manager);
512
513        Ok(())
514    }
515
516    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Writable)` if the current state is `expect`.
517    /// Otherwise, logs an error.
518    pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
519        if let Err(e) = self
520            .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
521        {
522            error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
523        }
524    }
525
526    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Staging)` if the current state is `expect`.
527    /// Otherwise, logs an error.
528    pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
529        if let Err(e) =
530            self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
531        {
532            error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
533        }
534    }
535
536    /// Returns the region statistic.
537    pub(crate) fn region_statistic(&self) -> RegionStatistic {
538        let version = self.version();
539        let memtables = &version.memtables;
540        let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
541
542        let sst_usage = version.ssts.sst_usage();
543        let index_usage = version.ssts.index_usage();
544        let flushed_entry_id = version.flushed_entry_id;
545
546        let wal_usage = self.estimated_wal_usage(memtable_usage);
547        let manifest_usage = self.stats.total_manifest_size();
548        let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
549        let num_files = version.ssts.num_files();
550        let manifest_version = self.stats.manifest_version();
551        let file_removed_cnt = self.stats.file_removed_cnt();
552
553        let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
554        let written_bytes = self.written_bytes.load(Ordering::Relaxed);
555
556        RegionStatistic {
557            num_rows,
558            memtable_size: memtable_usage,
559            wal_size: wal_usage,
560            manifest_size: manifest_usage,
561            sst_size: sst_usage,
562            sst_num: num_files,
563            index_size: index_usage,
564            manifest: RegionManifestInfo::Mito {
565                manifest_version,
566                flushed_entry_id,
567                file_removed_cnt,
568            },
569            data_topic_latest_entry_id: topic_latest_entry_id,
570            metadata_topic_latest_entry_id: topic_latest_entry_id,
571            written_bytes,
572        }
573    }
574
575    /// Estimated WAL size in bytes.
576    /// Use the memtables size to estimate the size of wal.
577    fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
578        ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
579    }
580
581    /// Sets the state of the region to given state if the current state equals to
582    /// the expected.
583    fn compare_exchange_state(
584        &self,
585        expect: RegionLeaderState,
586        state: RegionRoleState,
587    ) -> Result<()> {
588        self.manifest_ctx
589            .state
590            .compare_exchange(RegionRoleState::Leader(expect), state)
591            .map_err(|actual| {
592                RegionStateSnafu {
593                    region_id: self.region_id,
594                    state: actual,
595                    expect: RegionRoleState::Leader(expect),
596                }
597                .build()
598            })?;
599        Ok(())
600    }
601
602    pub fn access_layer(&self) -> AccessLayerRef {
603        self.access_layer.clone()
604    }
605
606    /// Returns the SST entries of the region.
607    pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
608        let table_dir = self.table_dir();
609        let path_type = self.access_layer.path_type();
610
611        let visible_ssts = self
612            .version()
613            .ssts
614            .levels()
615            .iter()
616            .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
617            .collect::<HashSet<_>>();
618
619        let manifest_files = self.manifest_ctx.manifest().await.files.clone();
620        let staging_files = self
621            .manifest_ctx
622            .staging_manifest()
623            .await
624            .map(|m| m.files.clone())
625            .unwrap_or_default();
626        let files = manifest_files
627            .into_iter()
628            .chain(staging_files.into_iter())
629            .collect::<HashMap<_, _>>();
630
631        files
632            .values()
633            .map(|meta| {
634                let region_id = self.region_id;
635                let origin_region_id = meta.region_id;
636                let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
637                {
638                    let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
639                    (
640                        meta.index_version,
641                        Some(index_file_path),
642                        Some(meta.index_file_size),
643                    )
644                } else {
645                    (0, None, None)
646                };
647                let visible = visible_ssts.contains(&meta.file_id);
648                ManifestSstEntry {
649                    table_dir: table_dir.to_string(),
650                    region_id,
651                    table_id: region_id.table_id(),
652                    region_number: region_id.region_number(),
653                    region_group: region_id.region_group(),
654                    region_sequence: region_id.region_sequence(),
655                    file_id: meta.file_id.to_string(),
656                    index_version,
657                    level: meta.level,
658                    file_path: sst_file_path(table_dir, meta.file_id(), path_type),
659                    file_size: meta.file_size,
660                    index_file_path,
661                    index_file_size,
662                    num_rows: meta.num_rows,
663                    num_row_groups: meta.num_row_groups,
664                    num_series: Some(meta.num_series),
665                    min_ts: meta.time_range.0,
666                    max_ts: meta.time_range.1,
667                    sequence: meta.sequence.map(|s| s.get()),
668                    origin_region_id,
669                    node_id: None,
670                    visible,
671                }
672            })
673            .collect()
674    }
675
676    /// Returns the file metas of the region by file ids.
677    pub async fn file_metas(&self, file_ids: &[FileId]) -> Vec<Option<FileMeta>> {
678        let manifest_files = self.manifest_ctx.manifest().await.files.clone();
679
680        file_ids
681            .iter()
682            .map(|file_id| manifest_files.get(file_id).cloned())
683            .collect::<Vec<_>>()
684    }
685
686    /// Exit staging mode successfully by merging all staged manifests and making them visible.
687    pub(crate) async fn exit_staging_on_success(
688        &self,
689        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
690    ) -> Result<()> {
691        let current_state = self.manifest_ctx.current_state();
692        ensure!(
693            current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
694            RegionStateSnafu {
695                region_id: self.region_id,
696                state: current_state,
697                expect: RegionRoleState::Leader(RegionLeaderState::Staging),
698            }
699        );
700
701        // Merge all staged manifest actions
702        let merged_actions = match manager.merge_staged_actions(current_state).await? {
703            Some(actions) => actions,
704            None => {
705                info!(
706                    "No staged manifests to merge for region {}, exiting staging mode without changes",
707                    self.region_id
708                );
709                // Even if no manifests to merge, we still need to exit staging mode
710                self.exit_staging()?;
711                return Ok(());
712            }
713        };
714        let expect_change = merged_actions.actions.iter().any(|a| a.is_change());
715        let expect_edit = merged_actions.actions.iter().any(|a| a.is_edit());
716        ensure!(
717            expect_change,
718            UnexpectedSnafu {
719                reason: "expect a change action in merged actions"
720            }
721        );
722        ensure!(
723            expect_edit,
724            UnexpectedSnafu {
725                reason: "expect an edit action in merged actions"
726            }
727        );
728
729        // Submit merged actions using the manifest manager's update method
730        // Pass the `false` so it saves to normal directory, not staging
731        let new_version = manager.update(merged_actions.clone(), false).await?;
732
733        info!(
734            "Successfully submitted merged staged manifests for region {}, new version: {}",
735            self.region_id, new_version
736        );
737
738        // Apply the merged changes to in-memory version control
739        let (merged_change, merged_edit) = merged_actions.split_region_change_and_edit();
740        // Safety: we have already ensured that there is a change action in the merged actions.
741        let new_metadata = merged_change.as_ref().unwrap().metadata.clone();
742        self.version_control.alter_schema(new_metadata);
743        self.version_control
744            .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
745
746        // Clear all staging manifests and transit state
747        if let Err(e) = manager.clear_staging_manifest_and_dir().await {
748            error!(e; "Failed to clear staging manifest dir for region {}", self.region_id);
749        }
750        self.exit_staging()?;
751
752        Ok(())
753    }
754
755    /// Returns the partition expression string for this region.
756    ///
757    /// If the region is currently in staging state, this returns the partition expression held in
758    /// the staging partition field. Otherwise, it returns the partition expression from the primary
759    /// region metadata (current committed version).
760    pub fn maybe_staging_partition_expr_str(&self) -> Option<String> {
761        let is_staging = self.is_staging();
762        if is_staging {
763            let staging_partition_expr = self.staging_partition_expr.lock().unwrap();
764            if staging_partition_expr.is_none() {
765                warn!(
766                    "Staging partition expr is none for region {} in staging state",
767                    self.region_id
768                );
769            }
770            staging_partition_expr.clone()
771        } else {
772            let version = self.version();
773            version.metadata.partition_expr.clone()
774        }
775    }
776}
777
778/// Context to update the region manifest.
779#[derive(Debug)]
780pub(crate) struct ManifestContext {
781    /// Manager to maintain manifest for this region.
782    pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
783    /// The state of the region. The region checks the state before updating
784    /// manifest.
785    state: AtomicCell<RegionRoleState>,
786}
787
788impl ManifestContext {
789    pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
790        ManifestContext {
791            manifest_manager: tokio::sync::RwLock::new(manager),
792            state: AtomicCell::new(state),
793        }
794    }
795
796    pub(crate) async fn manifest_version(&self) -> ManifestVersion {
797        self.manifest_manager
798            .read()
799            .await
800            .manifest()
801            .manifest_version
802    }
803
804    pub(crate) async fn has_update(&self) -> Result<bool> {
805        self.manifest_manager.read().await.has_update().await
806    }
807
808    /// Returns the current region role state.
809    pub(crate) fn current_state(&self) -> RegionRoleState {
810        self.state.load()
811    }
812
813    /// Installs the manifest changes from the current version to the target version (inclusive).
814    ///
815    /// Returns installed [RegionManifest].
816    /// **Note**: This function is not guaranteed to install the target version strictly.
817    /// The installed version may be greater than the target version.
818    pub(crate) async fn install_manifest_to(
819        &self,
820        version: ManifestVersion,
821    ) -> Result<Arc<RegionManifest>> {
822        let mut manager = self.manifest_manager.write().await;
823        manager.install_manifest_to(version).await?;
824
825        Ok(manager.manifest())
826    }
827
828    /// Updates the manifest if current state is `expect_state`.
829    pub(crate) async fn update_manifest(
830        &self,
831        expect_state: RegionLeaderState,
832        action_list: RegionMetaActionList,
833        is_staging: bool,
834    ) -> Result<ManifestVersion> {
835        // Acquires the write lock of the manifest manager.
836        let mut manager = self.manifest_manager.write().await;
837        // Gets current manifest.
838        let manifest = manager.manifest();
839        // Checks state inside the lock. This is to ensure that we won't update the manifest
840        // after `set_readonly_gracefully()` is called.
841        let current_state = self.state.load();
842
843        // If expect_state is not downgrading, the current state must be either `expect_state` or downgrading.
844        //
845        // A downgrading leader rejects user writes but still allows
846        // flushing the memtable and updating the manifest.
847        if expect_state != RegionLeaderState::Downgrading {
848            if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
849                info!(
850                    "Region {} is in downgrading leader state, updating manifest. state is {:?}",
851                    manifest.metadata.region_id, expect_state
852                );
853            }
854            ensure!(
855                current_state == RegionRoleState::Leader(expect_state)
856                    || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
857                UpdateManifestSnafu {
858                    region_id: manifest.metadata.region_id,
859                    state: current_state,
860                }
861            );
862        } else {
863            ensure!(
864                current_state == RegionRoleState::Leader(expect_state),
865                RegionStateSnafu {
866                    region_id: manifest.metadata.region_id,
867                    state: current_state,
868                    expect: RegionRoleState::Leader(expect_state),
869                }
870            );
871        }
872
873        for action in &action_list.actions {
874            // Checks whether the edit is still applicable.
875            let RegionMetaAction::Edit(edit) = &action else {
876                continue;
877            };
878
879            // Checks whether the region is truncated.
880            let Some(truncated_entry_id) = manifest.truncated_entry_id else {
881                continue;
882            };
883
884            // This is an edit from flush.
885            if let Some(flushed_entry_id) = edit.flushed_entry_id {
886                ensure!(
887                    truncated_entry_id < flushed_entry_id,
888                    RegionTruncatedSnafu {
889                        region_id: manifest.metadata.region_id,
890                    }
891                );
892            }
893
894            // This is an edit from compaction.
895            if !edit.files_to_remove.is_empty() {
896                // Input files of the compaction task has been truncated.
897                for file in &edit.files_to_remove {
898                    ensure!(
899                        manifest.files.contains_key(&file.file_id),
900                        RegionTruncatedSnafu {
901                            region_id: manifest.metadata.region_id,
902                        }
903                    );
904                }
905            }
906        }
907
908        // Now we can update the manifest.
909        let version = manager.update(action_list, is_staging).await.inspect_err(
910            |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
911        )?;
912
913        if self.state.load() == RegionRoleState::Follower {
914            warn!(
915                "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
916                manifest.metadata.region_id
917            );
918        }
919
920        Ok(version)
921    }
922
923    /// Sets the [`RegionRole`].
924    ///
925    /// ```text
926    ///     +------------------------------------------+
927    ///     |                      +-----------------+ |
928    ///     |                      |                 | |
929    /// +---+------+       +-------+-----+        +--v-v---+
930    /// | Follower |       | Downgrading |        | Leader |
931    /// +---^-^----+       +-----+-^-----+        +--+-+---+
932    ///     | |                  | |                 | |
933    ///     | +------------------+ +-----------------+ |
934    ///     +------------------------------------------+
935    ///
936    /// Transition:
937    /// - Follower -> Leader
938    /// - Downgrading Leader -> Leader
939    /// - Leader -> Follower
940    /// - Downgrading Leader -> Follower
941    /// - Leader -> Downgrading Leader
942    ///
943    /// ```
944    pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
945        match next_role {
946            RegionRole::Follower => {
947                match self.state.fetch_update(|state| {
948                    if !matches!(state, RegionRoleState::Follower) {
949                        Some(RegionRoleState::Follower)
950                    } else {
951                        None
952                    }
953                }) {
954                    Ok(state) => info!(
955                        "Convert region {} to follower, previous role state: {:?}",
956                        region_id, state
957                    ),
958                    Err(state) => {
959                        if state != RegionRoleState::Follower {
960                            warn!(
961                                "Failed to convert region {} to follower, current role state: {:?}",
962                                region_id, state
963                            )
964                        }
965                    }
966                }
967            }
968            RegionRole::Leader => {
969                match self.state.fetch_update(|state| {
970                    if matches!(
971                        state,
972                        RegionRoleState::Follower
973                            | RegionRoleState::Leader(RegionLeaderState::Downgrading)
974                    ) {
975                        Some(RegionRoleState::Leader(RegionLeaderState::Writable))
976                    } else {
977                        None
978                    }
979                }) {
980                    Ok(state) => info!(
981                        "Convert region {} to leader, previous role state: {:?}",
982                        region_id, state
983                    ),
984                    Err(state) => {
985                        if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
986                            warn!(
987                                "Failed to convert region {} to leader, current role state: {:?}",
988                                region_id, state
989                            )
990                        }
991                    }
992                }
993            }
994            RegionRole::DowngradingLeader => {
995                match self.state.compare_exchange(
996                    RegionRoleState::Leader(RegionLeaderState::Writable),
997                    RegionRoleState::Leader(RegionLeaderState::Downgrading),
998                ) {
999                    Ok(state) => info!(
1000                        "Convert region {} to downgrading region, previous role state: {:?}",
1001                        region_id, state
1002                    ),
1003                    Err(state) => {
1004                        if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
1005                            warn!(
1006                                "Failed to convert region {} to downgrading leader, current role state: {:?}",
1007                                region_id, state
1008                            )
1009                        }
1010                    }
1011                }
1012            }
1013        }
1014    }
1015
1016    /// Returns the normal manifest of the region.
1017    pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
1018        self.manifest_manager.read().await.manifest()
1019    }
1020
1021    /// Returns the staging manifest of the region.
1022    pub(crate) async fn staging_manifest(
1023        &self,
1024    ) -> Option<Arc<crate::manifest::action::RegionManifest>> {
1025        self.manifest_manager.read().await.staging_manifest()
1026    }
1027}
1028
1029pub(crate) type ManifestContextRef = Arc<ManifestContext>;
1030
1031/// Regions indexed by ids.
1032#[derive(Debug, Default)]
1033pub(crate) struct RegionMap {
1034    regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
1035}
1036
1037impl RegionMap {
1038    /// Returns true if the region exists.
1039    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1040        let regions = self.regions.read().unwrap();
1041        regions.contains_key(&region_id)
1042    }
1043
1044    /// Inserts a new region into the map.
1045    pub(crate) fn insert_region(&self, region: MitoRegionRef) {
1046        let mut regions = self.regions.write().unwrap();
1047        regions.insert(region.region_id, region);
1048    }
1049
1050    /// Gets region by region id.
1051    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1052        let regions = self.regions.read().unwrap();
1053        regions.get(&region_id).cloned()
1054    }
1055
1056    /// Gets writable region by region id.
1057    ///
1058    /// Returns error if the region does not exist or is readonly.
1059    pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1060        let region = self
1061            .get_region(region_id)
1062            .context(RegionNotFoundSnafu { region_id })?;
1063        ensure!(
1064            region.is_writable(),
1065            RegionStateSnafu {
1066                region_id,
1067                state: region.state(),
1068                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1069            }
1070        );
1071        Ok(region)
1072    }
1073
1074    /// Gets readonly region by region id.
1075    ///
1076    /// Returns error if the region does not exist or is writable.
1077    pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1078        let region = self
1079            .get_region(region_id)
1080            .context(RegionNotFoundSnafu { region_id })?;
1081        ensure!(
1082            region.is_follower(),
1083            RegionStateSnafu {
1084                region_id,
1085                state: region.state(),
1086                expect: RegionRoleState::Follower,
1087            }
1088        );
1089
1090        Ok(region)
1091    }
1092
1093    /// Gets region by region id.
1094    ///
1095    /// Calls the callback if the region does not exist.
1096    pub(crate) fn get_region_or<F: OnFailure>(
1097        &self,
1098        region_id: RegionId,
1099        cb: &mut F,
1100    ) -> Option<MitoRegionRef> {
1101        match self
1102            .get_region(region_id)
1103            .context(RegionNotFoundSnafu { region_id })
1104        {
1105            Ok(region) => Some(region),
1106            Err(e) => {
1107                cb.on_failure(e);
1108                None
1109            }
1110        }
1111    }
1112
1113    /// Gets writable region by region id.
1114    ///
1115    /// Calls the callback if the region does not exist or is readonly.
1116    pub(crate) fn writable_region_or<F: OnFailure>(
1117        &self,
1118        region_id: RegionId,
1119        cb: &mut F,
1120    ) -> Option<MitoRegionRef> {
1121        match self.writable_region(region_id) {
1122            Ok(region) => Some(region),
1123            Err(e) => {
1124                cb.on_failure(e);
1125                None
1126            }
1127        }
1128    }
1129
1130    /// Gets writable non-staging region by region id.
1131    ///
1132    /// Returns error if the region does not exist, is readonly, or is in staging mode.
1133    pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1134        let region = self.writable_region(region_id)?;
1135        if region.is_staging() {
1136            return Err(crate::error::RegionStateSnafu {
1137                region_id,
1138                state: region.state(),
1139                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1140            }
1141            .build());
1142        }
1143        Ok(region)
1144    }
1145
1146    /// Gets staging region by region id.
1147    ///
1148    /// Returns error if the region does not exist or is not in staging state.
1149    pub(crate) fn staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1150        let region = self
1151            .get_region(region_id)
1152            .context(RegionNotFoundSnafu { region_id })?;
1153        ensure!(
1154            region.is_staging(),
1155            RegionStateSnafu {
1156                region_id,
1157                state: region.state(),
1158                expect: RegionRoleState::Leader(RegionLeaderState::Staging),
1159            }
1160        );
1161        Ok(region)
1162    }
1163
1164    /// Gets flushable region by region id.
1165    ///
1166    /// Returns error if the region does not exist.
1167    /// Returns None if the region exists but not operatable.
1168    fn flushable_region(&self, region_id: RegionId) -> Result<Option<MitoRegionRef>> {
1169        let region = self
1170            .get_region(region_id)
1171            .context(RegionNotFoundSnafu { region_id })?;
1172        if region.is_flushable() {
1173            Ok(Some(region))
1174        } else {
1175            Ok(None)
1176        }
1177    }
1178
1179    /// Gets flushable region by region id.
1180    ///
1181    /// Calls the callback if the region does not exist.
1182    /// Returns None if the region exists but not operatable.
1183    pub(crate) fn flushable_region_or<F: OnFailure>(
1184        &self,
1185        region_id: RegionId,
1186        cb: &mut F,
1187    ) -> Option<MitoRegionRef> {
1188        match self.flushable_region(region_id) {
1189            Ok(region) => region,
1190            Err(e) => {
1191                cb.on_failure(e);
1192                None
1193            }
1194        }
1195    }
1196
1197    /// Remove region by id.
1198    pub(crate) fn remove_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1199        let mut regions = self.regions.write().unwrap();
1200        regions.remove(&region_id)
1201    }
1202
1203    /// List all regions.
1204    pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1205        let regions = self.regions.read().unwrap();
1206        regions.values().cloned().collect()
1207    }
1208
1209    /// Clear the map.
1210    pub(crate) fn clear(&self) {
1211        self.regions.write().unwrap().clear();
1212    }
1213}
1214
1215pub(crate) type RegionMapRef = Arc<RegionMap>;
1216
1217/// Opening regions
1218#[derive(Debug, Default)]
1219pub(crate) struct OpeningRegions {
1220    regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1221}
1222
1223impl OpeningRegions {
1224    /// Registers `sender` for an opening region; Otherwise, it returns `None`.
1225    pub(crate) fn wait_for_opening_region(
1226        &self,
1227        region_id: RegionId,
1228        sender: OptionOutputTx,
1229    ) -> Option<OptionOutputTx> {
1230        let mut regions = self.regions.write().unwrap();
1231        match regions.entry(region_id) {
1232            Entry::Occupied(mut senders) => {
1233                senders.get_mut().push(sender);
1234                None
1235            }
1236            Entry::Vacant(_) => Some(sender),
1237        }
1238    }
1239
1240    /// Returns true if the region exists.
1241    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1242        let regions = self.regions.read().unwrap();
1243        regions.contains_key(&region_id)
1244    }
1245
1246    /// Inserts a new region into the map.
1247    pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1248        let mut regions = self.regions.write().unwrap();
1249        regions.insert(region, vec![sender]);
1250    }
1251
1252    /// Remove region by id.
1253    pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1254        let mut regions = self.regions.write().unwrap();
1255        regions.remove(&region_id).unwrap_or_default()
1256    }
1257
1258    #[cfg(test)]
1259    pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1260        let regions = self.regions.read().unwrap();
1261        if let Some(senders) = regions.get(&region_id) {
1262            senders.len()
1263        } else {
1264            0
1265        }
1266    }
1267}
1268
1269pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1270
1271/// The regions that are catching up.
1272#[derive(Debug, Default)]
1273pub(crate) struct CatchupRegions {
1274    regions: RwLock<HashSet<RegionId>>,
1275}
1276
1277impl CatchupRegions {
1278    /// Returns true if the region exists.
1279    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1280        let regions = self.regions.read().unwrap();
1281        regions.contains(&region_id)
1282    }
1283
1284    /// Inserts a new region into the set.
1285    pub(crate) fn insert_region(&self, region_id: RegionId) {
1286        let mut regions = self.regions.write().unwrap();
1287        regions.insert(region_id);
1288    }
1289
1290    /// Remove region by id.
1291    pub(crate) fn remove_region(&self, region_id: RegionId) {
1292        let mut regions = self.regions.write().unwrap();
1293        regions.remove(&region_id);
1294    }
1295}
1296
1297pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1298
1299/// Manifest stats.
1300#[derive(Default, Debug, Clone)]
1301pub struct ManifestStats {
1302    pub(crate) total_manifest_size: Arc<AtomicU64>,
1303    pub(crate) manifest_version: Arc<AtomicU64>,
1304    pub(crate) file_removed_cnt: Arc<AtomicU64>,
1305}
1306
1307impl ManifestStats {
1308    fn total_manifest_size(&self) -> u64 {
1309        self.total_manifest_size.load(Ordering::Relaxed)
1310    }
1311
1312    fn manifest_version(&self) -> u64 {
1313        self.manifest_version.load(Ordering::Relaxed)
1314    }
1315
1316    fn file_removed_cnt(&self) -> u64 {
1317        self.file_removed_cnt.load(Ordering::Relaxed)
1318    }
1319}
1320
1321/// Parses the partition expression from a JSON string.
1322pub fn parse_partition_expr(partition_expr_str: Option<&str>) -> Result<Option<PartitionExpr>> {
1323    match partition_expr_str {
1324        None => Ok(None),
1325        Some("") => Ok(None),
1326        Some(json_str) => {
1327            let expr = partition::expr::PartitionExpr::from_json_str(json_str)
1328                .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?;
1329            Ok(expr)
1330        }
1331    }
1332}
1333
1334#[cfg(test)]
1335mod tests {
1336    use std::sync::atomic::AtomicU64;
1337    use std::sync::{Arc, Mutex};
1338
1339    use common_datasource::compression::CompressionType;
1340    use common_test_util::temp_dir::create_temp_dir;
1341    use crossbeam_utils::atomic::AtomicCell;
1342    use object_store::ObjectStore;
1343    use object_store::services::Fs;
1344    use store_api::logstore::provider::Provider;
1345    use store_api::region_engine::RegionRole;
1346    use store_api::region_request::PathType;
1347    use store_api::storage::RegionId;
1348
1349    use crate::access_layer::AccessLayer;
1350    use crate::manifest::action::RegionMetaActionList;
1351    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1352    use crate::region::{
1353        ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1354    };
1355    use crate::sst::FormatType;
1356    use crate::sst::index::intermediate::IntermediateManager;
1357    use crate::sst::index::puffin_manager::PuffinManagerFactory;
1358    use crate::test_util::scheduler_util::SchedulerEnv;
1359    use crate::test_util::version_util::VersionControlBuilder;
1360    use crate::time_provider::StdTimeProvider;
1361
1362    #[test]
1363    fn test_region_state_lock_free() {
1364        assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1365    }
1366
1367    #[tokio::test]
1368    async fn test_set_region_state() {
1369        let env = SchedulerEnv::new().await;
1370        let builder = VersionControlBuilder::new();
1371        let version_control = Arc::new(builder.build());
1372        let manifest_ctx = env
1373            .mock_manifest_context(version_control.current().version.metadata.clone())
1374            .await;
1375
1376        let region_id = RegionId::new(1024, 0);
1377        // Leader -> Follower
1378        manifest_ctx.set_role(RegionRole::Follower, region_id);
1379        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1380
1381        // Follower -> Leader
1382        manifest_ctx.set_role(RegionRole::Leader, region_id);
1383        assert_eq!(
1384            manifest_ctx.state.load(),
1385            RegionRoleState::Leader(RegionLeaderState::Writable)
1386        );
1387
1388        // Leader -> Downgrading Leader
1389        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1390        assert_eq!(
1391            manifest_ctx.state.load(),
1392            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1393        );
1394
1395        // Downgrading Leader -> Follower
1396        manifest_ctx.set_role(RegionRole::Follower, region_id);
1397        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1398
1399        // Can't downgrade from follower (Follower -> Downgrading Leader)
1400        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1401        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1402
1403        // Set region role too Downgrading Leader
1404        manifest_ctx.set_role(RegionRole::Leader, region_id);
1405        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1406        assert_eq!(
1407            manifest_ctx.state.load(),
1408            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1409        );
1410
1411        // Downgrading Leader -> Leader
1412        manifest_ctx.set_role(RegionRole::Leader, region_id);
1413        assert_eq!(
1414            manifest_ctx.state.load(),
1415            RegionRoleState::Leader(RegionLeaderState::Writable)
1416        );
1417    }
1418
1419    #[tokio::test]
1420    async fn test_staging_state_validation() {
1421        let env = SchedulerEnv::new().await;
1422        let builder = VersionControlBuilder::new();
1423        let version_control = Arc::new(builder.build());
1424
1425        // Create context with staging state using the correct pattern from SchedulerEnv
1426        let staging_ctx = {
1427            let manager = RegionManifestManager::new(
1428                version_control.current().version.metadata.clone(),
1429                0,
1430                RegionManifestOptions {
1431                    manifest_dir: "".to_string(),
1432                    object_store: env.access_layer.object_store().clone(),
1433                    compress_type: CompressionType::Uncompressed,
1434                    checkpoint_distance: 10,
1435                    remove_file_options: Default::default(),
1436                    manifest_cache: None,
1437                },
1438                FormatType::PrimaryKey,
1439                &Default::default(),
1440            )
1441            .await
1442            .unwrap();
1443            Arc::new(ManifestContext::new(
1444                manager,
1445                RegionRoleState::Leader(RegionLeaderState::Staging),
1446            ))
1447        };
1448
1449        // Test staging state behavior
1450        assert_eq!(
1451            staging_ctx.current_state(),
1452            RegionRoleState::Leader(RegionLeaderState::Staging)
1453        );
1454
1455        // Test writable context for comparison
1456        let writable_ctx = env
1457            .mock_manifest_context(version_control.current().version.metadata.clone())
1458            .await;
1459
1460        assert_eq!(
1461            writable_ctx.current_state(),
1462            RegionRoleState::Leader(RegionLeaderState::Writable)
1463        );
1464    }
1465
1466    #[tokio::test]
1467    async fn test_staging_state_transitions() {
1468        let builder = VersionControlBuilder::new();
1469        let version_control = Arc::new(builder.build());
1470        let metadata = version_control.current().version.metadata.clone();
1471
1472        // Create MitoRegion for testing state transitions
1473        let temp_dir = create_temp_dir("");
1474        let path_str = temp_dir.path().display().to_string();
1475        let fs_builder = Fs::default().root(&path_str);
1476        let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1477
1478        let index_aux_path = temp_dir.path().join("index_aux");
1479        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1480            .await
1481            .unwrap();
1482        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1483            .await
1484            .unwrap();
1485
1486        let access_layer = Arc::new(AccessLayer::new(
1487            "",
1488            PathType::Bare,
1489            object_store,
1490            puffin_mgr,
1491            intm_mgr,
1492        ));
1493
1494        let manager = RegionManifestManager::new(
1495            metadata.clone(),
1496            0,
1497            RegionManifestOptions {
1498                manifest_dir: "".to_string(),
1499                object_store: access_layer.object_store().clone(),
1500                compress_type: CompressionType::Uncompressed,
1501                checkpoint_distance: 10,
1502                remove_file_options: Default::default(),
1503                manifest_cache: None,
1504            },
1505            FormatType::PrimaryKey,
1506            &Default::default(),
1507        )
1508        .await
1509        .unwrap();
1510
1511        let manifest_ctx = Arc::new(ManifestContext::new(
1512            manager,
1513            RegionRoleState::Leader(RegionLeaderState::Writable),
1514        ));
1515
1516        let region = MitoRegion {
1517            region_id: metadata.region_id,
1518            version_control,
1519            access_layer,
1520            manifest_ctx: manifest_ctx.clone(),
1521            file_purger: crate::test_util::new_noop_file_purger(),
1522            provider: Provider::noop_provider(),
1523            last_flush_millis: Default::default(),
1524            last_compaction_millis: Default::default(),
1525            time_provider: Arc::new(StdTimeProvider),
1526            topic_latest_entry_id: Default::default(),
1527            written_bytes: Arc::new(AtomicU64::new(0)),
1528            stats: ManifestStats::default(),
1529            staging_partition_expr: Mutex::new(None),
1530        };
1531
1532        // Test initial state
1533        assert_eq!(
1534            region.state(),
1535            RegionRoleState::Leader(RegionLeaderState::Writable)
1536        );
1537        assert!(!region.is_staging());
1538
1539        // Test transition to staging
1540        let mut manager = manifest_ctx.manifest_manager.write().await;
1541        region.set_staging(&mut manager).await.unwrap();
1542        drop(manager);
1543        assert_eq!(
1544            region.state(),
1545            RegionRoleState::Leader(RegionLeaderState::Staging)
1546        );
1547        assert!(region.is_staging());
1548
1549        // Test transition back to writable
1550        region.exit_staging().unwrap();
1551        assert_eq!(
1552            region.state(),
1553            RegionRoleState::Leader(RegionLeaderState::Writable)
1554        );
1555        assert!(!region.is_staging());
1556
1557        // Test staging directory cleanup: Create dirty staging files before entering staging mode
1558        {
1559            // Create some dummy staging manifest files to simulate interrupted session
1560            let manager = manifest_ctx.manifest_manager.write().await;
1561            let dummy_actions = RegionMetaActionList::new(vec![]);
1562            let dummy_bytes = dummy_actions.encode().unwrap();
1563
1564            // Create dirty staging files with versions 100 and 101
1565            manager.store().save(100, &dummy_bytes, true).await.unwrap();
1566            manager.store().save(101, &dummy_bytes, true).await.unwrap();
1567            drop(manager);
1568
1569            // Verify dirty files exist before entering staging
1570            let manager = manifest_ctx.manifest_manager.read().await;
1571            let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1572            assert_eq!(
1573                dirty_manifests.len(),
1574                2,
1575                "Should have 2 dirty staging files"
1576            );
1577            drop(manager);
1578
1579            // Enter staging mode - this should clean up the dirty files
1580            let mut manager = manifest_ctx.manifest_manager.write().await;
1581            region.set_staging(&mut manager).await.unwrap();
1582            drop(manager);
1583
1584            // Verify dirty files are cleaned up after entering staging
1585            let manager = manifest_ctx.manifest_manager.read().await;
1586            let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1587            assert_eq!(
1588                cleaned_manifests.len(),
1589                0,
1590                "Dirty staging files should be cleaned up"
1591            );
1592            drop(manager);
1593
1594            // Exit staging to restore normal state for remaining tests
1595            region.exit_staging().unwrap();
1596        }
1597
1598        // Test invalid transitions
1599        let mut manager = manifest_ctx.manifest_manager.write().await;
1600        assert!(region.set_staging(&mut manager).await.is_ok()); // Writable -> Staging should work
1601        drop(manager);
1602        let mut manager = manifest_ctx.manifest_manager.write().await;
1603        assert!(region.set_staging(&mut manager).await.is_err()); // Staging -> Staging should fail
1604        drop(manager);
1605        assert!(region.exit_staging().is_ok()); // Staging -> Writable should work
1606        assert!(region.exit_staging().is_err()); // Writable -> Writable should fail
1607    }
1608}