mito2/
region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region.
16
17pub mod catchup;
18pub mod opener;
19pub mod options;
20pub(crate) mod version;
21
22use std::collections::hash_map::Entry;
23use std::collections::{HashMap, HashSet};
24use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
25use std::sync::{Arc, Mutex, RwLock};
26
27use common_telemetry::{error, info, warn};
28use crossbeam_utils::atomic::AtomicCell;
29use snafu::{OptionExt, ensure};
30use store_api::ManifestVersion;
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::logstore::provider::Provider;
33use store_api::metadata::RegionMetadataRef;
34use store_api::region_engine::{
35    RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
36};
37use store_api::sst_entry::ManifestSstEntry;
38use store_api::storage::{RegionId, SequenceNumber};
39use tokio::sync::RwLockWriteGuard;
40
41use crate::access_layer::AccessLayerRef;
42use crate::error::{
43    FlushableRegionStateSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
44    UpdateManifestSnafu,
45};
46use crate::manifest::action::{
47    RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
48};
49use crate::manifest::manager::RegionManifestManager;
50use crate::region::version::{VersionControlRef, VersionRef};
51use crate::request::{OnFailure, OptionOutputTx};
52use crate::sst::file_purger::FilePurgerRef;
53use crate::sst::location::{index_file_path, sst_file_path};
54use crate::time_provider::TimeProviderRef;
55
56/// This is the approximate factor to estimate the size of wal.
57const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
58
59/// Region status include region id, memtable usage, sst usage, wal usage and manifest usage.
60#[derive(Debug)]
61pub struct RegionUsage {
62    pub region_id: RegionId,
63    pub wal_usage: u64,
64    pub sst_usage: u64,
65    pub manifest_usage: u64,
66}
67
68impl RegionUsage {
69    pub fn disk_usage(&self) -> u64 {
70        self.wal_usage + self.sst_usage + self.manifest_usage
71    }
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum RegionLeaderState {
76    /// The region is opened and is writable.
77    Writable,
78    /// The region is in staging mode - writable but no checkpoint/compaction.
79    Staging,
80    /// The region is entering staging mode. - write requests will be stalled.
81    EnteringStaging,
82    /// The region is altering.
83    Altering,
84    /// The region is dropping.
85    Dropping,
86    /// The region is truncating.
87    Truncating,
88    /// The region is handling a region edit.
89    Editing,
90    /// The region is stepping down.
91    Downgrading,
92}
93
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub enum RegionRoleState {
96    Leader(RegionLeaderState),
97    Follower,
98}
99
100/// Metadata and runtime status of a region.
101///
102/// Writing and reading a region follow a single-writer-multi-reader rule:
103/// - Only the region worker thread this region belongs to can modify the metadata.
104/// - Multiple reader threads are allowed to read a specific `version` of a region.
105#[derive(Debug)]
106pub struct MitoRegion {
107    /// Id of this region.
108    ///
109    /// Accessing region id from the version control is inconvenient so
110    /// we also store it here.
111    pub(crate) region_id: RegionId,
112
113    /// Version controller for this region.
114    ///
115    /// We MUST update the version control inside the write lock of the region manifest manager.
116    pub(crate) version_control: VersionControlRef,
117    /// SSTs accessor for this region.
118    pub(crate) access_layer: AccessLayerRef,
119    /// Context to maintain manifest for this region.
120    pub(crate) manifest_ctx: ManifestContextRef,
121    /// SST file purger.
122    pub(crate) file_purger: FilePurgerRef,
123    /// The provider of log store.
124    pub(crate) provider: Provider,
125    /// Last flush time in millis.
126    last_flush_millis: AtomicI64,
127    /// Last compaction time in millis.
128    last_compaction_millis: AtomicI64,
129    /// Provider to get current time.
130    time_provider: TimeProviderRef,
131    /// The topic's latest entry id since the region's last flushing.
132    /// **Only used for remote WAL pruning.**
133    ///
134    /// The value will be updated to the latest offset of the topic
135    /// if region receives a flush request or schedules a periodic flush task
136    /// and the region's memtable is empty.
137    ///
138    /// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region,
139    /// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`.
140    pub(crate) topic_latest_entry_id: AtomicU64,
141    /// The total bytes written to the region.
142    pub(crate) written_bytes: Arc<AtomicU64>,
143    /// The partition expression of the region in staging mode.
144    ///
145    /// During the staging mode, the region metadata in [`VersionControlRef`] is not updated,
146    /// so we need to store the partition expression separately.
147    /// TODO(weny):
148    /// 1. Reload the staging partition expr during region open.
149    /// 2. Rejects requests with mismatching partition expr.
150    pub(crate) staging_partition_expr: Mutex<Option<String>>,
151    /// manifest stats
152    stats: ManifestStats,
153}
154
155pub type MitoRegionRef = Arc<MitoRegion>;
156
157impl MitoRegion {
158    /// Stop background managers for this region.
159    pub(crate) async fn stop(&self) {
160        self.manifest_ctx
161            .manifest_manager
162            .write()
163            .await
164            .stop()
165            .await;
166
167        info!(
168            "Stopped region manifest manager, region_id: {}",
169            self.region_id
170        );
171    }
172
173    /// Returns current metadata of the region.
174    pub(crate) fn metadata(&self) -> RegionMetadataRef {
175        let version_data = self.version_control.current();
176        version_data.version.metadata.clone()
177    }
178
179    /// Returns primary key encoding of the region.
180    pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
181        let version_data = self.version_control.current();
182        version_data.version.metadata.primary_key_encoding
183    }
184
185    /// Returns current version of the region.
186    pub(crate) fn version(&self) -> VersionRef {
187        let version_data = self.version_control.current();
188        version_data.version
189    }
190
191    /// Returns last flush timestamp in millis.
192    pub(crate) fn last_flush_millis(&self) -> i64 {
193        self.last_flush_millis.load(Ordering::Relaxed)
194    }
195
196    /// Update flush time to current time.
197    pub(crate) fn update_flush_millis(&self) {
198        let now = self.time_provider.current_time_millis();
199        self.last_flush_millis.store(now, Ordering::Relaxed);
200    }
201
202    /// Returns last compaction timestamp in millis.
203    pub(crate) fn last_compaction_millis(&self) -> i64 {
204        self.last_compaction_millis.load(Ordering::Relaxed)
205    }
206
207    /// Update compaction time to current time.
208    pub(crate) fn update_compaction_millis(&self) {
209        let now = self.time_provider.current_time_millis();
210        self.last_compaction_millis.store(now, Ordering::Relaxed);
211    }
212
213    /// Returns the table dir.
214    pub(crate) fn table_dir(&self) -> &str {
215        self.access_layer.table_dir()
216    }
217
218    /// Returns whether the region is writable.
219    pub(crate) fn is_writable(&self) -> bool {
220        matches!(
221            self.manifest_ctx.state.load(),
222            RegionRoleState::Leader(RegionLeaderState::Writable)
223                | RegionRoleState::Leader(RegionLeaderState::Staging)
224        )
225    }
226
227    /// Returns whether the region is flushable.
228    pub(crate) fn is_flushable(&self) -> bool {
229        matches!(
230            self.manifest_ctx.state.load(),
231            RegionRoleState::Leader(RegionLeaderState::Writable)
232                | RegionRoleState::Leader(RegionLeaderState::Staging)
233                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
234        )
235    }
236
237    /// Returns whether the region should abort index building.
238    pub(crate) fn should_abort_index(&self) -> bool {
239        matches!(
240            self.manifest_ctx.state.load(),
241            RegionRoleState::Follower
242                | RegionRoleState::Leader(RegionLeaderState::Dropping)
243                | RegionRoleState::Leader(RegionLeaderState::Truncating)
244                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
245                | RegionRoleState::Leader(RegionLeaderState::Staging)
246        )
247    }
248
249    /// Returns whether the region is downgrading.
250    pub(crate) fn is_downgrading(&self) -> bool {
251        matches!(
252            self.manifest_ctx.state.load(),
253            RegionRoleState::Leader(RegionLeaderState::Downgrading)
254        )
255    }
256
257    /// Returns whether the region is in staging mode.
258    #[allow(dead_code)]
259    pub(crate) fn is_staging(&self) -> bool {
260        self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
261    }
262
263    pub fn region_id(&self) -> RegionId {
264        self.region_id
265    }
266
267    pub fn find_committed_sequence(&self) -> SequenceNumber {
268        self.version_control.committed_sequence()
269    }
270
271    /// Returns whether the region is readonly.
272    pub fn is_follower(&self) -> bool {
273        self.manifest_ctx.state.load() == RegionRoleState::Follower
274    }
275
276    /// Returns the state of the region.
277    pub(crate) fn state(&self) -> RegionRoleState {
278        self.manifest_ctx.state.load()
279    }
280
281    /// Sets the region role state.
282    pub(crate) fn set_role(&self, next_role: RegionRole) {
283        self.manifest_ctx.set_role(next_role, self.region_id);
284    }
285
286    /// Sets the altering state.
287    /// You should call this method in the worker loop.
288    pub(crate) fn set_altering(&self) -> Result<()> {
289        self.compare_exchange_state(
290            RegionLeaderState::Writable,
291            RegionRoleState::Leader(RegionLeaderState::Altering),
292        )
293    }
294
295    /// Sets the dropping state.
296    /// You should call this method in the worker loop.
297    pub(crate) fn set_dropping(&self) -> Result<()> {
298        self.compare_exchange_state(
299            RegionLeaderState::Writable,
300            RegionRoleState::Leader(RegionLeaderState::Dropping),
301        )
302    }
303
304    /// Sets the truncating state.
305    /// You should call this method in the worker loop.
306    pub(crate) fn set_truncating(&self) -> Result<()> {
307        self.compare_exchange_state(
308            RegionLeaderState::Writable,
309            RegionRoleState::Leader(RegionLeaderState::Truncating),
310        )
311    }
312
313    /// Sets the editing state.
314    /// You should call this method in the worker loop.
315    pub(crate) fn set_editing(&self) -> Result<()> {
316        self.compare_exchange_state(
317            RegionLeaderState::Writable,
318            RegionRoleState::Leader(RegionLeaderState::Editing),
319        )
320    }
321
322    /// Sets the staging state.
323    ///
324    /// You should call this method in the worker loop.
325    /// Transitions from Writable to Staging state.
326    /// Cleans any existing staging manifests before entering staging mode.
327    pub(crate) async fn set_staging(
328        &self,
329        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
330    ) -> Result<()> {
331        manager.store().clear_staging_manifests().await?;
332
333        self.compare_exchange_state(
334            RegionLeaderState::Writable,
335            RegionRoleState::Leader(RegionLeaderState::Staging),
336        )
337    }
338
339    /// Sets the entering staging state.
340    pub(crate) fn set_entering_staging(&self) -> Result<()> {
341        self.compare_exchange_state(
342            RegionLeaderState::Writable,
343            RegionRoleState::Leader(RegionLeaderState::EnteringStaging),
344        )
345    }
346
347    /// Exits the staging state back to writable.
348    ///
349    /// You should call this method in the worker loop.
350    /// Transitions from Staging to Writable state.
351    pub fn exit_staging(&self) -> Result<()> {
352        self.compare_exchange_state(
353            RegionLeaderState::Staging,
354            RegionRoleState::Leader(RegionLeaderState::Writable),
355        )
356    }
357
358    /// Sets the region role state gracefully. This acquires the manifest write lock.
359    pub(crate) async fn set_role_state_gracefully(
360        &self,
361        state: SettableRegionRoleState,
362    ) -> Result<()> {
363        let mut manager = self.manifest_ctx.manifest_manager.write().await;
364        let current_state = self.state();
365
366        match state {
367            SettableRegionRoleState::Leader => {
368                // Exit staging mode and return to normal writable leader
369                // Only allowed from staging state
370                match current_state {
371                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
372                        info!("Exiting staging mode for region {}", self.region_id);
373                        // Use the success exit path that merges all staged manifests
374                        self.exit_staging_on_success(&mut manager).await?;
375                    }
376                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
377                        // Already in desired state - no-op
378                        info!("Region {} already in normal leader mode", self.region_id);
379                    }
380                    _ => {
381                        // Only staging -> leader transition is allowed
382                        return Err(RegionStateSnafu {
383                            region_id: self.region_id,
384                            state: current_state,
385                            expect: RegionRoleState::Leader(RegionLeaderState::Staging),
386                        }
387                        .build());
388                    }
389                }
390            }
391
392            SettableRegionRoleState::StagingLeader => {
393                // Enter staging mode from normal writable leader
394                // Only allowed from writable leader state
395                match current_state {
396                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
397                        info!("Entering staging mode for region {}", self.region_id);
398                        self.set_staging(&mut manager).await?;
399                    }
400                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
401                        // Already in desired state - no-op
402                        info!("Region {} already in staging mode", self.region_id);
403                    }
404                    _ => {
405                        return Err(RegionStateSnafu {
406                            region_id: self.region_id,
407                            state: current_state,
408                            expect: RegionRoleState::Leader(RegionLeaderState::Writable),
409                        }
410                        .build());
411                    }
412                }
413            }
414
415            SettableRegionRoleState::Follower => {
416                // Make this region a follower
417                match current_state {
418                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
419                        info!(
420                            "Exiting staging and demoting region {} to follower",
421                            self.region_id
422                        );
423                        self.exit_staging()?;
424                        self.set_role(RegionRole::Follower);
425                    }
426                    RegionRoleState::Leader(_) => {
427                        info!("Demoting region {} from leader to follower", self.region_id);
428                        self.set_role(RegionRole::Follower);
429                    }
430                    RegionRoleState::Follower => {
431                        // Already in desired state - no-op
432                        info!("Region {} already in follower mode", self.region_id);
433                    }
434                }
435            }
436
437            SettableRegionRoleState::DowngradingLeader => {
438                // downgrade this region to downgrading leader
439                match current_state {
440                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
441                        info!(
442                            "Exiting staging and entering downgrade for region {}",
443                            self.region_id
444                        );
445                        self.exit_staging()?;
446                        self.set_role(RegionRole::DowngradingLeader);
447                    }
448                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
449                        info!("Starting downgrade for region {}", self.region_id);
450                        self.set_role(RegionRole::DowngradingLeader);
451                    }
452                    RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
453                        // Already in desired state - no-op
454                        info!("Region {} already in downgrading mode", self.region_id);
455                    }
456                    _ => {
457                        warn!(
458                            "Cannot start downgrade for region {} from state {:?}",
459                            self.region_id, current_state
460                        );
461                    }
462                }
463            }
464        }
465
466        // Hack(zhongzc): If we have just become leader (writable), persist any backfilled metadata.
467        if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
468            // Persist backfilled metadata if manifest is missing fields (e.g., partition_expr)
469            let manifest_meta = &manager.manifest().metadata;
470            let current_version = self.version();
471            let current_meta = &current_version.metadata;
472            if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
473                let action = RegionMetaAction::Change(RegionChange {
474                    metadata: current_meta.clone(),
475                    sst_format: current_version.options.sst_format.unwrap_or_default(),
476                });
477                let result = manager
478                    .update(RegionMetaActionList::with_action(action), false)
479                    .await;
480
481                match result {
482                    Ok(version) => {
483                        info!(
484                            "Successfully persisted backfilled metadata for region {}, version: {}",
485                            self.region_id, version
486                        );
487                    }
488                    Err(e) => {
489                        warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
490                    }
491                }
492            }
493        }
494
495        drop(manager);
496
497        Ok(())
498    }
499
500    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Writable)` if the current state is `expect`.
501    /// Otherwise, logs an error.
502    pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
503        if let Err(e) = self
504            .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
505        {
506            error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
507        }
508    }
509
510    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Staging)` if the current state is `expect`.
511    /// Otherwise, logs an error.
512    pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
513        if let Err(e) =
514            self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
515        {
516            error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
517        }
518    }
519
520    /// Returns the region statistic.
521    pub(crate) fn region_statistic(&self) -> RegionStatistic {
522        let version = self.version();
523        let memtables = &version.memtables;
524        let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
525
526        let sst_usage = version.ssts.sst_usage();
527        let index_usage = version.ssts.index_usage();
528        let flushed_entry_id = version.flushed_entry_id;
529
530        let wal_usage = self.estimated_wal_usage(memtable_usage);
531        let manifest_usage = self.stats.total_manifest_size();
532        let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
533        let num_files = version.ssts.num_files();
534        let manifest_version = self.stats.manifest_version();
535        let file_removed_cnt = self.stats.file_removed_cnt();
536
537        let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
538        let written_bytes = self.written_bytes.load(Ordering::Relaxed);
539
540        RegionStatistic {
541            num_rows,
542            memtable_size: memtable_usage,
543            wal_size: wal_usage,
544            manifest_size: manifest_usage,
545            sst_size: sst_usage,
546            sst_num: num_files,
547            index_size: index_usage,
548            manifest: RegionManifestInfo::Mito {
549                manifest_version,
550                flushed_entry_id,
551                file_removed_cnt,
552            },
553            data_topic_latest_entry_id: topic_latest_entry_id,
554            metadata_topic_latest_entry_id: topic_latest_entry_id,
555            written_bytes,
556        }
557    }
558
559    /// Estimated WAL size in bytes.
560    /// Use the memtables size to estimate the size of wal.
561    fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
562        ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
563    }
564
565    /// Sets the state of the region to given state if the current state equals to
566    /// the expected.
567    fn compare_exchange_state(
568        &self,
569        expect: RegionLeaderState,
570        state: RegionRoleState,
571    ) -> Result<()> {
572        self.manifest_ctx
573            .state
574            .compare_exchange(RegionRoleState::Leader(expect), state)
575            .map_err(|actual| {
576                RegionStateSnafu {
577                    region_id: self.region_id,
578                    state: actual,
579                    expect: RegionRoleState::Leader(expect),
580                }
581                .build()
582            })?;
583        Ok(())
584    }
585
586    pub fn access_layer(&self) -> AccessLayerRef {
587        self.access_layer.clone()
588    }
589
590    /// Returns the SST entries of the region.
591    pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
592        let table_dir = self.table_dir();
593        let path_type = self.access_layer.path_type();
594
595        let visible_ssts = self
596            .version()
597            .ssts
598            .levels()
599            .iter()
600            .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
601            .collect::<HashSet<_>>();
602
603        let manifest_files = self.manifest_ctx.manifest().await.files.clone();
604        let staging_files = self
605            .manifest_ctx
606            .staging_manifest()
607            .await
608            .map(|m| m.files.clone())
609            .unwrap_or_default();
610        let files = manifest_files
611            .into_iter()
612            .chain(staging_files.into_iter())
613            .collect::<HashMap<_, _>>();
614
615        files
616            .values()
617            .map(|meta| {
618                let region_id = self.region_id;
619                let origin_region_id = meta.region_id;
620                let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
621                {
622                    let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
623                    (
624                        meta.index_version,
625                        Some(index_file_path),
626                        Some(meta.index_file_size),
627                    )
628                } else {
629                    (0, None, None)
630                };
631                let visible = visible_ssts.contains(&meta.file_id);
632                ManifestSstEntry {
633                    table_dir: table_dir.to_string(),
634                    region_id,
635                    table_id: region_id.table_id(),
636                    region_number: region_id.region_number(),
637                    region_group: region_id.region_group(),
638                    region_sequence: region_id.region_sequence(),
639                    file_id: meta.file_id.to_string(),
640                    index_version,
641                    level: meta.level,
642                    file_path: sst_file_path(table_dir, meta.file_id(), path_type),
643                    file_size: meta.file_size,
644                    index_file_path,
645                    index_file_size,
646                    num_rows: meta.num_rows,
647                    num_row_groups: meta.num_row_groups,
648                    num_series: Some(meta.num_series),
649                    min_ts: meta.time_range.0,
650                    max_ts: meta.time_range.1,
651                    sequence: meta.sequence.map(|s| s.get()),
652                    origin_region_id,
653                    node_id: None,
654                    visible,
655                }
656            })
657            .collect()
658    }
659
660    /// Exit staging mode successfully by merging all staged manifests and making them visible.
661    pub(crate) async fn exit_staging_on_success(
662        &self,
663        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
664    ) -> Result<()> {
665        let current_state = self.manifest_ctx.current_state();
666        ensure!(
667            current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
668            RegionStateSnafu {
669                region_id: self.region_id,
670                state: current_state,
671                expect: RegionRoleState::Leader(RegionLeaderState::Staging),
672            }
673        );
674
675        // Merge all staged manifest actions
676        let merged_actions = match manager.merge_staged_actions(current_state).await? {
677            Some(actions) => actions,
678            None => {
679                info!(
680                    "No staged manifests to merge for region {}, exiting staging mode without changes",
681                    self.region_id
682                );
683                // Even if no manifests to merge, we still need to exit staging mode
684                self.exit_staging()?;
685                return Ok(());
686            }
687        };
688
689        // Submit merged actions using the manifest manager's update method
690        // Pass the `false` so it saves to normal directory, not staging
691        let new_version = manager.update(merged_actions.clone(), false).await?;
692
693        info!(
694            "Successfully submitted merged staged manifests for region {}, new version: {}",
695            self.region_id, new_version
696        );
697
698        // Apply the merged changes to in-memory version control
699        let merged_edit = merged_actions.into_region_edit();
700        self.version_control
701            .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
702
703        // Clear all staging manifests and transit state
704        manager.store().clear_staging_manifests().await?;
705        self.exit_staging()?;
706
707        Ok(())
708    }
709}
710
711/// Context to update the region manifest.
712#[derive(Debug)]
713pub(crate) struct ManifestContext {
714    /// Manager to maintain manifest for this region.
715    pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
716    /// The state of the region. The region checks the state before updating
717    /// manifest.
718    state: AtomicCell<RegionRoleState>,
719}
720
721impl ManifestContext {
722    pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
723        ManifestContext {
724            manifest_manager: tokio::sync::RwLock::new(manager),
725            state: AtomicCell::new(state),
726        }
727    }
728
729    pub(crate) async fn manifest_version(&self) -> ManifestVersion {
730        self.manifest_manager
731            .read()
732            .await
733            .manifest()
734            .manifest_version
735    }
736
737    pub(crate) async fn has_update(&self) -> Result<bool> {
738        self.manifest_manager.read().await.has_update().await
739    }
740
741    /// Returns the current region role state.
742    pub(crate) fn current_state(&self) -> RegionRoleState {
743        self.state.load()
744    }
745
746    /// Installs the manifest changes from the current version to the target version (inclusive).
747    ///
748    /// Returns installed [RegionManifest].
749    /// **Note**: This function is not guaranteed to install the target version strictly.
750    /// The installed version may be greater than the target version.
751    pub(crate) async fn install_manifest_to(
752        &self,
753        version: ManifestVersion,
754    ) -> Result<Arc<RegionManifest>> {
755        let mut manager = self.manifest_manager.write().await;
756        manager.install_manifest_to(version).await?;
757
758        Ok(manager.manifest())
759    }
760
761    /// Updates the manifest if current state is `expect_state`.
762    pub(crate) async fn update_manifest(
763        &self,
764        expect_state: RegionLeaderState,
765        action_list: RegionMetaActionList,
766        is_staging: bool,
767    ) -> Result<ManifestVersion> {
768        // Acquires the write lock of the manifest manager.
769        let mut manager = self.manifest_manager.write().await;
770        // Gets current manifest.
771        let manifest = manager.manifest();
772        // Checks state inside the lock. This is to ensure that we won't update the manifest
773        // after `set_readonly_gracefully()` is called.
774        let current_state = self.state.load();
775
776        // If expect_state is not downgrading, the current state must be either `expect_state` or downgrading.
777        //
778        // A downgrading leader rejects user writes but still allows
779        // flushing the memtable and updating the manifest.
780        if expect_state != RegionLeaderState::Downgrading {
781            if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
782                info!(
783                    "Region {} is in downgrading leader state, updating manifest. state is {:?}",
784                    manifest.metadata.region_id, expect_state
785                );
786            }
787            ensure!(
788                current_state == RegionRoleState::Leader(expect_state)
789                    || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
790                UpdateManifestSnafu {
791                    region_id: manifest.metadata.region_id,
792                    state: current_state,
793                }
794            );
795        } else {
796            ensure!(
797                current_state == RegionRoleState::Leader(expect_state),
798                RegionStateSnafu {
799                    region_id: manifest.metadata.region_id,
800                    state: current_state,
801                    expect: RegionRoleState::Leader(expect_state),
802                }
803            );
804        }
805
806        for action in &action_list.actions {
807            // Checks whether the edit is still applicable.
808            let RegionMetaAction::Edit(edit) = &action else {
809                continue;
810            };
811
812            // Checks whether the region is truncated.
813            let Some(truncated_entry_id) = manifest.truncated_entry_id else {
814                continue;
815            };
816
817            // This is an edit from flush.
818            if let Some(flushed_entry_id) = edit.flushed_entry_id {
819                ensure!(
820                    truncated_entry_id < flushed_entry_id,
821                    RegionTruncatedSnafu {
822                        region_id: manifest.metadata.region_id,
823                    }
824                );
825            }
826
827            // This is an edit from compaction.
828            if !edit.files_to_remove.is_empty() {
829                // Input files of the compaction task has been truncated.
830                for file in &edit.files_to_remove {
831                    ensure!(
832                        manifest.files.contains_key(&file.file_id),
833                        RegionTruncatedSnafu {
834                            region_id: manifest.metadata.region_id,
835                        }
836                    );
837                }
838            }
839        }
840
841        // Now we can update the manifest.
842        let version = manager.update(action_list, is_staging).await.inspect_err(
843            |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
844        )?;
845
846        if self.state.load() == RegionRoleState::Follower {
847            warn!(
848                "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
849                manifest.metadata.region_id
850            );
851        }
852
853        Ok(version)
854    }
855
856    /// Sets the [`RegionRole`].
857    ///
858    /// ```text
859    ///     +------------------------------------------+
860    ///     |                      +-----------------+ |
861    ///     |                      |                 | |
862    /// +---+------+       +-------+-----+        +--v-v---+
863    /// | Follower |       | Downgrading |        | Leader |
864    /// +---^-^----+       +-----+-^-----+        +--+-+---+
865    ///     | |                  | |                 | |
866    ///     | +------------------+ +-----------------+ |
867    ///     +------------------------------------------+
868    ///
869    /// Transition:
870    /// - Follower -> Leader
871    /// - Downgrading Leader -> Leader
872    /// - Leader -> Follower
873    /// - Downgrading Leader -> Follower
874    /// - Leader -> Downgrading Leader
875    ///
876    /// ```
877    pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
878        match next_role {
879            RegionRole::Follower => {
880                match self.state.fetch_update(|state| {
881                    if !matches!(state, RegionRoleState::Follower) {
882                        Some(RegionRoleState::Follower)
883                    } else {
884                        None
885                    }
886                }) {
887                    Ok(state) => info!(
888                        "Convert region {} to follower, previous role state: {:?}",
889                        region_id, state
890                    ),
891                    Err(state) => {
892                        if state != RegionRoleState::Follower {
893                            warn!(
894                                "Failed to convert region {} to follower, current role state: {:?}",
895                                region_id, state
896                            )
897                        }
898                    }
899                }
900            }
901            RegionRole::Leader => {
902                match self.state.fetch_update(|state| {
903                    if matches!(
904                        state,
905                        RegionRoleState::Follower
906                            | RegionRoleState::Leader(RegionLeaderState::Downgrading)
907                    ) {
908                        Some(RegionRoleState::Leader(RegionLeaderState::Writable))
909                    } else {
910                        None
911                    }
912                }) {
913                    Ok(state) => info!(
914                        "Convert region {} to leader, previous role state: {:?}",
915                        region_id, state
916                    ),
917                    Err(state) => {
918                        if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
919                            warn!(
920                                "Failed to convert region {} to leader, current role state: {:?}",
921                                region_id, state
922                            )
923                        }
924                    }
925                }
926            }
927            RegionRole::DowngradingLeader => {
928                match self.state.compare_exchange(
929                    RegionRoleState::Leader(RegionLeaderState::Writable),
930                    RegionRoleState::Leader(RegionLeaderState::Downgrading),
931                ) {
932                    Ok(state) => info!(
933                        "Convert region {} to downgrading region, previous role state: {:?}",
934                        region_id, state
935                    ),
936                    Err(state) => {
937                        if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
938                            warn!(
939                                "Failed to convert region {} to downgrading leader, current role state: {:?}",
940                                region_id, state
941                            )
942                        }
943                    }
944                }
945            }
946        }
947    }
948
949    /// Returns the normal manifest of the region.
950    pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
951        self.manifest_manager.read().await.manifest()
952    }
953
954    /// Returns the staging manifest of the region.
955    pub(crate) async fn staging_manifest(
956        &self,
957    ) -> Option<Arc<crate::manifest::action::RegionManifest>> {
958        self.manifest_manager.read().await.staging_manifest()
959    }
960}
961
962pub(crate) type ManifestContextRef = Arc<ManifestContext>;
963
964/// Regions indexed by ids.
965#[derive(Debug, Default)]
966pub(crate) struct RegionMap {
967    regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
968}
969
970impl RegionMap {
971    /// Returns true if the region exists.
972    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
973        let regions = self.regions.read().unwrap();
974        regions.contains_key(&region_id)
975    }
976
977    /// Inserts a new region into the map.
978    pub(crate) fn insert_region(&self, region: MitoRegionRef) {
979        let mut regions = self.regions.write().unwrap();
980        regions.insert(region.region_id, region);
981    }
982
983    /// Gets region by region id.
984    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
985        let regions = self.regions.read().unwrap();
986        regions.get(&region_id).cloned()
987    }
988
989    /// Gets writable region by region id.
990    ///
991    /// Returns error if the region does not exist or is readonly.
992    pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
993        let region = self
994            .get_region(region_id)
995            .context(RegionNotFoundSnafu { region_id })?;
996        ensure!(
997            region.is_writable(),
998            RegionStateSnafu {
999                region_id,
1000                state: region.state(),
1001                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1002            }
1003        );
1004        Ok(region)
1005    }
1006
1007    /// Gets readonly region by region id.
1008    ///
1009    /// Returns error if the region does not exist or is writable.
1010    pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1011        let region = self
1012            .get_region(region_id)
1013            .context(RegionNotFoundSnafu { region_id })?;
1014        ensure!(
1015            region.is_follower(),
1016            RegionStateSnafu {
1017                region_id,
1018                state: region.state(),
1019                expect: RegionRoleState::Follower,
1020            }
1021        );
1022
1023        Ok(region)
1024    }
1025
1026    /// Gets region by region id.
1027    ///
1028    /// Calls the callback if the region does not exist.
1029    pub(crate) fn get_region_or<F: OnFailure>(
1030        &self,
1031        region_id: RegionId,
1032        cb: &mut F,
1033    ) -> Option<MitoRegionRef> {
1034        match self
1035            .get_region(region_id)
1036            .context(RegionNotFoundSnafu { region_id })
1037        {
1038            Ok(region) => Some(region),
1039            Err(e) => {
1040                cb.on_failure(e);
1041                None
1042            }
1043        }
1044    }
1045
1046    /// Gets writable region by region id.
1047    ///
1048    /// Calls the callback if the region does not exist or is readonly.
1049    pub(crate) fn writable_region_or<F: OnFailure>(
1050        &self,
1051        region_id: RegionId,
1052        cb: &mut F,
1053    ) -> Option<MitoRegionRef> {
1054        match self.writable_region(region_id) {
1055            Ok(region) => Some(region),
1056            Err(e) => {
1057                cb.on_failure(e);
1058                None
1059            }
1060        }
1061    }
1062
1063    /// Gets writable non-staging region by region id.
1064    ///
1065    /// Returns error if the region does not exist, is readonly, or is in staging mode.
1066    pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1067        let region = self.writable_region(region_id)?;
1068        if region.is_staging() {
1069            return Err(crate::error::RegionStateSnafu {
1070                region_id,
1071                state: region.state(),
1072                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1073            }
1074            .build());
1075        }
1076        Ok(region)
1077    }
1078
1079    /// Gets staging region by region id.
1080    ///
1081    /// Returns error if the region does not exist or is not in staging state.
1082    pub(crate) fn staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1083        let region = self
1084            .get_region(region_id)
1085            .context(RegionNotFoundSnafu { region_id })?;
1086        ensure!(
1087            region.is_staging(),
1088            RegionStateSnafu {
1089                region_id,
1090                state: region.state(),
1091                expect: RegionRoleState::Leader(RegionLeaderState::Staging),
1092            }
1093        );
1094        Ok(region)
1095    }
1096
1097    /// Gets flushable region by region id.
1098    ///
1099    /// Returns error if the region does not exist or is not operable.
1100    fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1101        let region = self
1102            .get_region(region_id)
1103            .context(RegionNotFoundSnafu { region_id })?;
1104        ensure!(
1105            region.is_flushable(),
1106            FlushableRegionStateSnafu {
1107                region_id,
1108                state: region.state(),
1109            }
1110        );
1111        Ok(region)
1112    }
1113
1114    /// Gets flushable region by region id.
1115    ///
1116    /// Calls the callback if the region does not exist or is not operable.
1117    pub(crate) fn flushable_region_or<F: OnFailure>(
1118        &self,
1119        region_id: RegionId,
1120        cb: &mut F,
1121    ) -> Option<MitoRegionRef> {
1122        match self.flushable_region(region_id) {
1123            Ok(region) => Some(region),
1124            Err(e) => {
1125                cb.on_failure(e);
1126                None
1127            }
1128        }
1129    }
1130
1131    /// Remove region by id.
1132    pub(crate) fn remove_region(&self, region_id: RegionId) {
1133        let mut regions = self.regions.write().unwrap();
1134        regions.remove(&region_id);
1135    }
1136
1137    /// List all regions.
1138    pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1139        let regions = self.regions.read().unwrap();
1140        regions.values().cloned().collect()
1141    }
1142
1143    /// Clear the map.
1144    pub(crate) fn clear(&self) {
1145        self.regions.write().unwrap().clear();
1146    }
1147}
1148
1149pub(crate) type RegionMapRef = Arc<RegionMap>;
1150
1151/// Opening regions
1152#[derive(Debug, Default)]
1153pub(crate) struct OpeningRegions {
1154    regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1155}
1156
1157impl OpeningRegions {
1158    /// Registers `sender` for an opening region; Otherwise, it returns `None`.
1159    pub(crate) fn wait_for_opening_region(
1160        &self,
1161        region_id: RegionId,
1162        sender: OptionOutputTx,
1163    ) -> Option<OptionOutputTx> {
1164        let mut regions = self.regions.write().unwrap();
1165        match regions.entry(region_id) {
1166            Entry::Occupied(mut senders) => {
1167                senders.get_mut().push(sender);
1168                None
1169            }
1170            Entry::Vacant(_) => Some(sender),
1171        }
1172    }
1173
1174    /// Returns true if the region exists.
1175    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1176        let regions = self.regions.read().unwrap();
1177        regions.contains_key(&region_id)
1178    }
1179
1180    /// Inserts a new region into the map.
1181    pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1182        let mut regions = self.regions.write().unwrap();
1183        regions.insert(region, vec![sender]);
1184    }
1185
1186    /// Remove region by id.
1187    pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1188        let mut regions = self.regions.write().unwrap();
1189        regions.remove(&region_id).unwrap_or_default()
1190    }
1191
1192    #[cfg(test)]
1193    pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1194        let regions = self.regions.read().unwrap();
1195        if let Some(senders) = regions.get(&region_id) {
1196            senders.len()
1197        } else {
1198            0
1199        }
1200    }
1201}
1202
1203pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1204
1205/// The regions that are catching up.
1206#[derive(Debug, Default)]
1207pub(crate) struct CatchupRegions {
1208    regions: RwLock<HashSet<RegionId>>,
1209}
1210
1211impl CatchupRegions {
1212    /// Returns true if the region exists.
1213    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1214        let regions = self.regions.read().unwrap();
1215        regions.contains(&region_id)
1216    }
1217
1218    /// Inserts a new region into the set.
1219    pub(crate) fn insert_region(&self, region_id: RegionId) {
1220        let mut regions = self.regions.write().unwrap();
1221        regions.insert(region_id);
1222    }
1223
1224    /// Remove region by id.
1225    pub(crate) fn remove_region(&self, region_id: RegionId) {
1226        let mut regions = self.regions.write().unwrap();
1227        regions.remove(&region_id);
1228    }
1229}
1230
1231pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1232
1233/// Manifest stats.
1234#[derive(Default, Debug, Clone)]
1235pub struct ManifestStats {
1236    pub(crate) total_manifest_size: Arc<AtomicU64>,
1237    pub(crate) manifest_version: Arc<AtomicU64>,
1238    pub(crate) file_removed_cnt: Arc<AtomicU64>,
1239}
1240
1241impl ManifestStats {
1242    fn total_manifest_size(&self) -> u64 {
1243        self.total_manifest_size.load(Ordering::Relaxed)
1244    }
1245
1246    fn manifest_version(&self) -> u64 {
1247        self.manifest_version.load(Ordering::Relaxed)
1248    }
1249
1250    fn file_removed_cnt(&self) -> u64 {
1251        self.file_removed_cnt.load(Ordering::Relaxed)
1252    }
1253}
1254
1255#[cfg(test)]
1256mod tests {
1257    use std::sync::atomic::AtomicU64;
1258    use std::sync::{Arc, Mutex};
1259
1260    use common_datasource::compression::CompressionType;
1261    use common_test_util::temp_dir::create_temp_dir;
1262    use crossbeam_utils::atomic::AtomicCell;
1263    use object_store::ObjectStore;
1264    use object_store::services::Fs;
1265    use store_api::logstore::provider::Provider;
1266    use store_api::region_engine::RegionRole;
1267    use store_api::region_request::PathType;
1268    use store_api::storage::RegionId;
1269
1270    use crate::access_layer::AccessLayer;
1271    use crate::manifest::action::RegionMetaActionList;
1272    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1273    use crate::region::{
1274        ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1275    };
1276    use crate::sst::FormatType;
1277    use crate::sst::index::intermediate::IntermediateManager;
1278    use crate::sst::index::puffin_manager::PuffinManagerFactory;
1279    use crate::test_util::scheduler_util::SchedulerEnv;
1280    use crate::test_util::version_util::VersionControlBuilder;
1281    use crate::time_provider::StdTimeProvider;
1282
1283    #[test]
1284    fn test_region_state_lock_free() {
1285        assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1286    }
1287
1288    #[tokio::test]
1289    async fn test_set_region_state() {
1290        let env = SchedulerEnv::new().await;
1291        let builder = VersionControlBuilder::new();
1292        let version_control = Arc::new(builder.build());
1293        let manifest_ctx = env
1294            .mock_manifest_context(version_control.current().version.metadata.clone())
1295            .await;
1296
1297        let region_id = RegionId::new(1024, 0);
1298        // Leader -> Follower
1299        manifest_ctx.set_role(RegionRole::Follower, region_id);
1300        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1301
1302        // Follower -> Leader
1303        manifest_ctx.set_role(RegionRole::Leader, region_id);
1304        assert_eq!(
1305            manifest_ctx.state.load(),
1306            RegionRoleState::Leader(RegionLeaderState::Writable)
1307        );
1308
1309        // Leader -> Downgrading Leader
1310        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1311        assert_eq!(
1312            manifest_ctx.state.load(),
1313            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1314        );
1315
1316        // Downgrading Leader -> Follower
1317        manifest_ctx.set_role(RegionRole::Follower, region_id);
1318        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1319
1320        // Can't downgrade from follower (Follower -> Downgrading Leader)
1321        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1322        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1323
1324        // Set region role too Downgrading Leader
1325        manifest_ctx.set_role(RegionRole::Leader, region_id);
1326        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1327        assert_eq!(
1328            manifest_ctx.state.load(),
1329            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1330        );
1331
1332        // Downgrading Leader -> Leader
1333        manifest_ctx.set_role(RegionRole::Leader, region_id);
1334        assert_eq!(
1335            manifest_ctx.state.load(),
1336            RegionRoleState::Leader(RegionLeaderState::Writable)
1337        );
1338    }
1339
1340    #[tokio::test]
1341    async fn test_staging_state_validation() {
1342        let env = SchedulerEnv::new().await;
1343        let builder = VersionControlBuilder::new();
1344        let version_control = Arc::new(builder.build());
1345
1346        // Create context with staging state using the correct pattern from SchedulerEnv
1347        let staging_ctx = {
1348            let manager = RegionManifestManager::new(
1349                version_control.current().version.metadata.clone(),
1350                0,
1351                RegionManifestOptions {
1352                    manifest_dir: "".to_string(),
1353                    object_store: env.access_layer.object_store().clone(),
1354                    compress_type: CompressionType::Uncompressed,
1355                    checkpoint_distance: 10,
1356                    remove_file_options: Default::default(),
1357                    manifest_cache: None,
1358                },
1359                FormatType::PrimaryKey,
1360                &Default::default(),
1361            )
1362            .await
1363            .unwrap();
1364            Arc::new(ManifestContext::new(
1365                manager,
1366                RegionRoleState::Leader(RegionLeaderState::Staging),
1367            ))
1368        };
1369
1370        // Test staging state behavior
1371        assert_eq!(
1372            staging_ctx.current_state(),
1373            RegionRoleState::Leader(RegionLeaderState::Staging)
1374        );
1375
1376        // Test writable context for comparison
1377        let writable_ctx = env
1378            .mock_manifest_context(version_control.current().version.metadata.clone())
1379            .await;
1380
1381        assert_eq!(
1382            writable_ctx.current_state(),
1383            RegionRoleState::Leader(RegionLeaderState::Writable)
1384        );
1385    }
1386
1387    #[tokio::test]
1388    async fn test_staging_state_transitions() {
1389        let builder = VersionControlBuilder::new();
1390        let version_control = Arc::new(builder.build());
1391        let metadata = version_control.current().version.metadata.clone();
1392
1393        // Create MitoRegion for testing state transitions
1394        let temp_dir = create_temp_dir("");
1395        let path_str = temp_dir.path().display().to_string();
1396        let fs_builder = Fs::default().root(&path_str);
1397        let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1398
1399        let index_aux_path = temp_dir.path().join("index_aux");
1400        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1401            .await
1402            .unwrap();
1403        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1404            .await
1405            .unwrap();
1406
1407        let access_layer = Arc::new(AccessLayer::new(
1408            "",
1409            PathType::Bare,
1410            object_store,
1411            puffin_mgr,
1412            intm_mgr,
1413        ));
1414
1415        let manager = RegionManifestManager::new(
1416            metadata.clone(),
1417            0,
1418            RegionManifestOptions {
1419                manifest_dir: "".to_string(),
1420                object_store: access_layer.object_store().clone(),
1421                compress_type: CompressionType::Uncompressed,
1422                checkpoint_distance: 10,
1423                remove_file_options: Default::default(),
1424                manifest_cache: None,
1425            },
1426            FormatType::PrimaryKey,
1427            &Default::default(),
1428        )
1429        .await
1430        .unwrap();
1431
1432        let manifest_ctx = Arc::new(ManifestContext::new(
1433            manager,
1434            RegionRoleState::Leader(RegionLeaderState::Writable),
1435        ));
1436
1437        let region = MitoRegion {
1438            region_id: metadata.region_id,
1439            version_control,
1440            access_layer,
1441            manifest_ctx: manifest_ctx.clone(),
1442            file_purger: crate::test_util::new_noop_file_purger(),
1443            provider: Provider::noop_provider(),
1444            last_flush_millis: Default::default(),
1445            last_compaction_millis: Default::default(),
1446            time_provider: Arc::new(StdTimeProvider),
1447            topic_latest_entry_id: Default::default(),
1448            written_bytes: Arc::new(AtomicU64::new(0)),
1449            stats: ManifestStats::default(),
1450            staging_partition_expr: Mutex::new(None),
1451        };
1452
1453        // Test initial state
1454        assert_eq!(
1455            region.state(),
1456            RegionRoleState::Leader(RegionLeaderState::Writable)
1457        );
1458        assert!(!region.is_staging());
1459
1460        // Test transition to staging
1461        let mut manager = manifest_ctx.manifest_manager.write().await;
1462        region.set_staging(&mut manager).await.unwrap();
1463        drop(manager);
1464        assert_eq!(
1465            region.state(),
1466            RegionRoleState::Leader(RegionLeaderState::Staging)
1467        );
1468        assert!(region.is_staging());
1469
1470        // Test transition back to writable
1471        region.exit_staging().unwrap();
1472        assert_eq!(
1473            region.state(),
1474            RegionRoleState::Leader(RegionLeaderState::Writable)
1475        );
1476        assert!(!region.is_staging());
1477
1478        // Test staging directory cleanup: Create dirty staging files before entering staging mode
1479        {
1480            // Create some dummy staging manifest files to simulate interrupted session
1481            let manager = manifest_ctx.manifest_manager.write().await;
1482            let dummy_actions = RegionMetaActionList::new(vec![]);
1483            let dummy_bytes = dummy_actions.encode().unwrap();
1484
1485            // Create dirty staging files with versions 100 and 101
1486            manager.store().save(100, &dummy_bytes, true).await.unwrap();
1487            manager.store().save(101, &dummy_bytes, true).await.unwrap();
1488            drop(manager);
1489
1490            // Verify dirty files exist before entering staging
1491            let manager = manifest_ctx.manifest_manager.read().await;
1492            let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1493            assert_eq!(
1494                dirty_manifests.len(),
1495                2,
1496                "Should have 2 dirty staging files"
1497            );
1498            drop(manager);
1499
1500            // Enter staging mode - this should clean up the dirty files
1501            let mut manager = manifest_ctx.manifest_manager.write().await;
1502            region.set_staging(&mut manager).await.unwrap();
1503            drop(manager);
1504
1505            // Verify dirty files are cleaned up after entering staging
1506            let manager = manifest_ctx.manifest_manager.read().await;
1507            let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1508            assert_eq!(
1509                cleaned_manifests.len(),
1510                0,
1511                "Dirty staging files should be cleaned up"
1512            );
1513            drop(manager);
1514
1515            // Exit staging to restore normal state for remaining tests
1516            region.exit_staging().unwrap();
1517        }
1518
1519        // Test invalid transitions
1520        let mut manager = manifest_ctx.manifest_manager.write().await;
1521        assert!(region.set_staging(&mut manager).await.is_ok()); // Writable -> Staging should work
1522        drop(manager);
1523        let mut manager = manifest_ctx.manifest_manager.write().await;
1524        assert!(region.set_staging(&mut manager).await.is_err()); // Staging -> Staging should fail
1525        drop(manager);
1526        assert!(region.exit_staging().is_ok()); // Staging -> Writable should work
1527        assert!(region.exit_staging().is_err()); // Writable -> Writable should fail
1528    }
1529}