mito2/
region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region.
16
17pub mod catchup;
18pub mod opener;
19pub mod options;
20pub(crate) mod version;
21
22use std::collections::hash_map::Entry;
23use std::collections::{HashMap, HashSet};
24use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
25use std::sync::{Arc, RwLock};
26
27use common_telemetry::{error, info, warn};
28use crossbeam_utils::atomic::AtomicCell;
29use snafu::{OptionExt, ensure};
30use store_api::ManifestVersion;
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::logstore::provider::Provider;
33use store_api::metadata::RegionMetadataRef;
34use store_api::region_engine::{
35    RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
36};
37use store_api::sst_entry::ManifestSstEntry;
38use store_api::storage::{RegionId, SequenceNumber};
39use tokio::sync::RwLockWriteGuard;
40
41use crate::access_layer::AccessLayerRef;
42use crate::error::{
43    FlushableRegionStateSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
44    UpdateManifestSnafu,
45};
46use crate::manifest::action::{
47    RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
48};
49use crate::manifest::manager::RegionManifestManager;
50use crate::memtable::MemtableBuilderRef;
51use crate::region::version::{VersionControlRef, VersionRef};
52use crate::request::{OnFailure, OptionOutputTx};
53use crate::sst::FormatType;
54use crate::sst::file_purger::FilePurgerRef;
55use crate::sst::location::{index_file_path, sst_file_path};
56use crate::time_provider::TimeProviderRef;
57
58/// This is the approximate factor to estimate the size of wal.
59const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
60
61/// Region status include region id, memtable usage, sst usage, wal usage and manifest usage.
62#[derive(Debug)]
63pub struct RegionUsage {
64    pub region_id: RegionId,
65    pub wal_usage: u64,
66    pub sst_usage: u64,
67    pub manifest_usage: u64,
68}
69
70impl RegionUsage {
71    pub fn disk_usage(&self) -> u64 {
72        self.wal_usage + self.sst_usage + self.manifest_usage
73    }
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
77pub enum RegionLeaderState {
78    /// The region is opened and is writable.
79    Writable,
80    /// The region is in staging mode - writable but no checkpoint/compaction.
81    Staging,
82    /// The region is altering.
83    Altering,
84    /// The region is dropping.
85    Dropping,
86    /// The region is truncating.
87    Truncating,
88    /// The region is handling a region edit.
89    Editing,
90    /// The region is stepping down.
91    Downgrading,
92}
93
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub enum RegionRoleState {
96    Leader(RegionLeaderState),
97    Follower,
98}
99
100/// Metadata and runtime status of a region.
101///
102/// Writing and reading a region follow a single-writer-multi-reader rule:
103/// - Only the region worker thread this region belongs to can modify the metadata.
104/// - Multiple reader threads are allowed to read a specific `version` of a region.
105#[derive(Debug)]
106pub struct MitoRegion {
107    /// Id of this region.
108    ///
109    /// Accessing region id from the version control is inconvenient so
110    /// we also store it here.
111    pub(crate) region_id: RegionId,
112
113    /// Version controller for this region.
114    ///
115    /// We MUST update the version control inside the write lock of the region manifest manager.
116    pub(crate) version_control: VersionControlRef,
117    /// SSTs accessor for this region.
118    pub(crate) access_layer: AccessLayerRef,
119    /// Context to maintain manifest for this region.
120    pub(crate) manifest_ctx: ManifestContextRef,
121    /// SST file purger.
122    pub(crate) file_purger: FilePurgerRef,
123    /// The provider of log store.
124    pub(crate) provider: Provider,
125    /// Last flush time in millis.
126    last_flush_millis: AtomicI64,
127    /// Last compaction time in millis.
128    last_compaction_millis: AtomicI64,
129    /// Provider to get current time.
130    time_provider: TimeProviderRef,
131    /// The topic's latest entry id since the region's last flushing.
132    /// **Only used for remote WAL pruning.**
133    ///
134    /// The value will be updated to the latest offset of the topic
135    /// if region receives a flush request or schedules a periodic flush task
136    /// and the region's memtable is empty.
137    ///
138    /// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region,
139    /// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`.
140    pub(crate) topic_latest_entry_id: AtomicU64,
141    /// The total bytes written to the region.
142    pub(crate) written_bytes: Arc<AtomicU64>,
143    /// Memtable builder for the region.
144    pub(crate) memtable_builder: MemtableBuilderRef,
145    /// Format type of the SST file.
146    pub(crate) sst_format: FormatType,
147    /// manifest stats
148    stats: ManifestStats,
149}
150
151pub type MitoRegionRef = Arc<MitoRegion>;
152
153impl MitoRegion {
154    /// Stop background managers for this region.
155    pub(crate) async fn stop(&self) {
156        self.manifest_ctx
157            .manifest_manager
158            .write()
159            .await
160            .stop()
161            .await;
162
163        info!(
164            "Stopped region manifest manager, region_id: {}",
165            self.region_id
166        );
167    }
168
169    /// Returns current metadata of the region.
170    pub(crate) fn metadata(&self) -> RegionMetadataRef {
171        let version_data = self.version_control.current();
172        version_data.version.metadata.clone()
173    }
174
175    /// Returns primary key encoding of the region.
176    pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
177        let version_data = self.version_control.current();
178        version_data.version.metadata.primary_key_encoding
179    }
180
181    /// Returns current version of the region.
182    pub(crate) fn version(&self) -> VersionRef {
183        let version_data = self.version_control.current();
184        version_data.version
185    }
186
187    /// Returns last flush timestamp in millis.
188    pub(crate) fn last_flush_millis(&self) -> i64 {
189        self.last_flush_millis.load(Ordering::Relaxed)
190    }
191
192    /// Update flush time to current time.
193    pub(crate) fn update_flush_millis(&self) {
194        let now = self.time_provider.current_time_millis();
195        self.last_flush_millis.store(now, Ordering::Relaxed);
196    }
197
198    /// Returns last compaction timestamp in millis.
199    pub(crate) fn last_compaction_millis(&self) -> i64 {
200        self.last_compaction_millis.load(Ordering::Relaxed)
201    }
202
203    /// Returns format type of the SST file.
204    pub(crate) fn sst_format(&self) -> FormatType {
205        self.sst_format
206    }
207
208    /// Update compaction time to current time.
209    pub(crate) fn update_compaction_millis(&self) {
210        let now = self.time_provider.current_time_millis();
211        self.last_compaction_millis.store(now, Ordering::Relaxed);
212    }
213
214    /// Returns the table dir.
215    pub(crate) fn table_dir(&self) -> &str {
216        self.access_layer.table_dir()
217    }
218
219    /// Returns whether the region is writable.
220    pub(crate) fn is_writable(&self) -> bool {
221        matches!(
222            self.manifest_ctx.state.load(),
223            RegionRoleState::Leader(RegionLeaderState::Writable)
224                | RegionRoleState::Leader(RegionLeaderState::Staging)
225        )
226    }
227
228    /// Returns whether the region is flushable.
229    pub(crate) fn is_flushable(&self) -> bool {
230        matches!(
231            self.manifest_ctx.state.load(),
232            RegionRoleState::Leader(RegionLeaderState::Writable)
233                | RegionRoleState::Leader(RegionLeaderState::Staging)
234                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
235        )
236    }
237
238    /// Returns whether the region should abort index building.
239    pub(crate) fn should_abort_index(&self) -> bool {
240        matches!(
241            self.manifest_ctx.state.load(),
242            RegionRoleState::Follower
243                | RegionRoleState::Leader(RegionLeaderState::Dropping)
244                | RegionRoleState::Leader(RegionLeaderState::Truncating)
245                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
246                | RegionRoleState::Leader(RegionLeaderState::Staging)
247        )
248    }
249
250    /// Returns whether the region is downgrading.
251    pub(crate) fn is_downgrading(&self) -> bool {
252        matches!(
253            self.manifest_ctx.state.load(),
254            RegionRoleState::Leader(RegionLeaderState::Downgrading)
255        )
256    }
257
258    /// Returns whether the region is in staging mode.
259    #[allow(dead_code)]
260    pub(crate) fn is_staging(&self) -> bool {
261        self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
262    }
263
264    pub fn region_id(&self) -> RegionId {
265        self.region_id
266    }
267
268    pub fn find_committed_sequence(&self) -> SequenceNumber {
269        self.version_control.committed_sequence()
270    }
271
272    /// Returns whether the region is readonly.
273    pub fn is_follower(&self) -> bool {
274        self.manifest_ctx.state.load() == RegionRoleState::Follower
275    }
276
277    /// Returns the state of the region.
278    pub(crate) fn state(&self) -> RegionRoleState {
279        self.manifest_ctx.state.load()
280    }
281
282    /// Sets the region role state.
283    pub(crate) fn set_role(&self, next_role: RegionRole) {
284        self.manifest_ctx.set_role(next_role, self.region_id);
285    }
286
287    /// Sets the altering state.
288    /// You should call this method in the worker loop.
289    pub(crate) fn set_altering(&self) -> Result<()> {
290        self.compare_exchange_state(
291            RegionLeaderState::Writable,
292            RegionRoleState::Leader(RegionLeaderState::Altering),
293        )
294    }
295
296    /// Sets the dropping state.
297    /// You should call this method in the worker loop.
298    pub(crate) fn set_dropping(&self) -> Result<()> {
299        self.compare_exchange_state(
300            RegionLeaderState::Writable,
301            RegionRoleState::Leader(RegionLeaderState::Dropping),
302        )
303    }
304
305    /// Sets the truncating state.
306    /// You should call this method in the worker loop.
307    pub(crate) fn set_truncating(&self) -> Result<()> {
308        self.compare_exchange_state(
309            RegionLeaderState::Writable,
310            RegionRoleState::Leader(RegionLeaderState::Truncating),
311        )
312    }
313
314    /// Sets the editing state.
315    /// You should call this method in the worker loop.
316    pub(crate) fn set_editing(&self) -> Result<()> {
317        self.compare_exchange_state(
318            RegionLeaderState::Writable,
319            RegionRoleState::Leader(RegionLeaderState::Editing),
320        )
321    }
322
323    /// Sets the staging state.
324    ///
325    /// You should call this method in the worker loop.
326    /// Transitions from Writable to Staging state.
327    /// Cleans any existing staging manifests before entering staging mode.
328    pub(crate) async fn set_staging(
329        &self,
330        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
331    ) -> Result<()> {
332        manager.store().clear_staging_manifests().await?;
333
334        self.compare_exchange_state(
335            RegionLeaderState::Writable,
336            RegionRoleState::Leader(RegionLeaderState::Staging),
337        )
338    }
339
340    /// Exits the staging state back to writable.
341    ///
342    /// You should call this method in the worker loop.
343    /// Transitions from Staging to Writable state.
344    fn exit_staging(&self) -> Result<()> {
345        self.compare_exchange_state(
346            RegionLeaderState::Staging,
347            RegionRoleState::Leader(RegionLeaderState::Writable),
348        )
349    }
350
351    /// Sets the region role state gracefully. This acquires the manifest write lock.
352    pub(crate) async fn set_role_state_gracefully(
353        &self,
354        state: SettableRegionRoleState,
355    ) -> Result<()> {
356        let mut manager = self.manifest_ctx.manifest_manager.write().await;
357        let current_state = self.state();
358
359        match state {
360            SettableRegionRoleState::Leader => {
361                // Exit staging mode and return to normal writable leader
362                // Only allowed from staging state
363                match current_state {
364                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
365                        info!("Exiting staging mode for region {}", self.region_id);
366                        // Use the success exit path that merges all staged manifests
367                        self.exit_staging_on_success(&mut manager).await?;
368                    }
369                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
370                        // Already in desired state - no-op
371                        info!("Region {} already in normal leader mode", self.region_id);
372                    }
373                    _ => {
374                        // Only staging -> leader transition is allowed
375                        return Err(RegionStateSnafu {
376                            region_id: self.region_id,
377                            state: current_state,
378                            expect: RegionRoleState::Leader(RegionLeaderState::Staging),
379                        }
380                        .build());
381                    }
382                }
383            }
384
385            SettableRegionRoleState::StagingLeader => {
386                // Enter staging mode from normal writable leader
387                // Only allowed from writable leader state
388                match current_state {
389                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
390                        info!("Entering staging mode for region {}", self.region_id);
391                        self.set_staging(&mut manager).await?;
392                    }
393                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
394                        // Already in desired state - no-op
395                        info!("Region {} already in staging mode", self.region_id);
396                    }
397                    _ => {
398                        return Err(RegionStateSnafu {
399                            region_id: self.region_id,
400                            state: current_state,
401                            expect: RegionRoleState::Leader(RegionLeaderState::Writable),
402                        }
403                        .build());
404                    }
405                }
406            }
407
408            SettableRegionRoleState::Follower => {
409                // Make this region a follower
410                match current_state {
411                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
412                        info!(
413                            "Exiting staging and demoting region {} to follower",
414                            self.region_id
415                        );
416                        self.exit_staging()?;
417                        self.set_role(RegionRole::Follower);
418                    }
419                    RegionRoleState::Leader(_) => {
420                        info!("Demoting region {} from leader to follower", self.region_id);
421                        self.set_role(RegionRole::Follower);
422                    }
423                    RegionRoleState::Follower => {
424                        // Already in desired state - no-op
425                        info!("Region {} already in follower mode", self.region_id);
426                    }
427                }
428            }
429
430            SettableRegionRoleState::DowngradingLeader => {
431                // downgrade this region to downgrading leader
432                match current_state {
433                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
434                        info!(
435                            "Exiting staging and entering downgrade for region {}",
436                            self.region_id
437                        );
438                        self.exit_staging()?;
439                        self.set_role(RegionRole::DowngradingLeader);
440                    }
441                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
442                        info!("Starting downgrade for region {}", self.region_id);
443                        self.set_role(RegionRole::DowngradingLeader);
444                    }
445                    RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
446                        // Already in desired state - no-op
447                        info!("Region {} already in downgrading mode", self.region_id);
448                    }
449                    _ => {
450                        warn!(
451                            "Cannot start downgrade for region {} from state {:?}",
452                            self.region_id, current_state
453                        );
454                    }
455                }
456            }
457        }
458
459        // Hack(zhongzc): If we have just become leader (writable), persist any backfilled metadata.
460        if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
461            // Persist backfilled metadata if manifest is missing fields (e.g., partition_expr)
462            let manifest_meta = &manager.manifest().metadata;
463            let current_meta = &self.version().metadata;
464            if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
465                let action = RegionMetaAction::Change(RegionChange {
466                    metadata: current_meta.clone(),
467                    sst_format: self.sst_format(),
468                });
469                let result = manager
470                    .update(
471                        RegionMetaActionList::with_action(action),
472                        RegionRoleState::Leader(RegionLeaderState::Writable),
473                    )
474                    .await;
475
476                match result {
477                    Ok(version) => {
478                        info!(
479                            "Successfully persisted backfilled metadata for region {}, version: {}",
480                            self.region_id, version
481                        );
482                    }
483                    Err(e) => {
484                        warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
485                    }
486                }
487            }
488        }
489
490        drop(manager);
491
492        Ok(())
493    }
494
495    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Writable)` if the current state is `expect`.
496    /// Otherwise, logs an error.
497    pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
498        if let Err(e) = self
499            .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
500        {
501            error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
502        }
503    }
504
505    /// Returns the region statistic.
506    pub(crate) fn region_statistic(&self) -> RegionStatistic {
507        let version = self.version();
508        let memtables = &version.memtables;
509        let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
510
511        let sst_usage = version.ssts.sst_usage();
512        let index_usage = version.ssts.index_usage();
513        let flushed_entry_id = version.flushed_entry_id;
514
515        let wal_usage = self.estimated_wal_usage(memtable_usage);
516        let manifest_usage = self.stats.total_manifest_size();
517        let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
518        let num_files = version.ssts.num_files();
519        let manifest_version = self.stats.manifest_version();
520
521        let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
522        let written_bytes = self.written_bytes.load(Ordering::Relaxed);
523
524        RegionStatistic {
525            num_rows,
526            memtable_size: memtable_usage,
527            wal_size: wal_usage,
528            manifest_size: manifest_usage,
529            sst_size: sst_usage,
530            sst_num: num_files,
531            index_size: index_usage,
532            manifest: RegionManifestInfo::Mito {
533                manifest_version,
534                flushed_entry_id,
535            },
536            data_topic_latest_entry_id: topic_latest_entry_id,
537            metadata_topic_latest_entry_id: topic_latest_entry_id,
538            written_bytes,
539        }
540    }
541
542    /// Estimated WAL size in bytes.
543    /// Use the memtables size to estimate the size of wal.
544    fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
545        ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
546    }
547
548    /// Sets the state of the region to given state if the current state equals to
549    /// the expected.
550    fn compare_exchange_state(
551        &self,
552        expect: RegionLeaderState,
553        state: RegionRoleState,
554    ) -> Result<()> {
555        self.manifest_ctx
556            .state
557            .compare_exchange(RegionRoleState::Leader(expect), state)
558            .map_err(|actual| {
559                RegionStateSnafu {
560                    region_id: self.region_id,
561                    state: actual,
562                    expect: RegionRoleState::Leader(expect),
563                }
564                .build()
565            })?;
566        Ok(())
567    }
568
569    pub fn access_layer(&self) -> AccessLayerRef {
570        self.access_layer.clone()
571    }
572
573    /// Returns the SST entries of the region.
574    pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
575        let table_dir = self.table_dir();
576        let path_type = self.access_layer.path_type();
577
578        let visible_ssts = self
579            .version()
580            .ssts
581            .levels()
582            .iter()
583            .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
584            .collect::<HashSet<_>>();
585
586        self.manifest_ctx
587            .manifest()
588            .await
589            .files
590            .values()
591            .map(|meta| {
592                let region_id = self.region_id;
593                let origin_region_id = meta.region_id;
594                let (index_file_id, index_file_path, index_file_size) = if meta.index_file_size > 0
595                {
596                    let index_file_path =
597                        index_file_path(table_dir, meta.index_file_id(), path_type);
598                    (
599                        Some(meta.index_file_id().file_id().to_string()),
600                        Some(index_file_path),
601                        Some(meta.index_file_size),
602                    )
603                } else {
604                    (None, None, None)
605                };
606                let visible = visible_ssts.contains(&meta.file_id);
607                ManifestSstEntry {
608                    table_dir: table_dir.to_string(),
609                    region_id,
610                    table_id: region_id.table_id(),
611                    region_number: region_id.region_number(),
612                    region_group: region_id.region_group(),
613                    region_sequence: region_id.region_sequence(),
614                    file_id: meta.file_id.to_string(),
615                    index_file_id,
616                    level: meta.level,
617                    file_path: sst_file_path(table_dir, meta.file_id(), path_type),
618                    file_size: meta.file_size,
619                    index_file_path,
620                    index_file_size,
621                    num_rows: meta.num_rows,
622                    num_row_groups: meta.num_row_groups,
623                    num_series: Some(meta.num_series),
624                    min_ts: meta.time_range.0,
625                    max_ts: meta.time_range.1,
626                    sequence: meta.sequence.map(|s| s.get()),
627                    origin_region_id,
628                    node_id: None,
629                    visible,
630                }
631            })
632            .collect()
633    }
634
635    /// Exit staging mode successfully by merging all staged manifests and making them visible.
636    pub(crate) async fn exit_staging_on_success(
637        &self,
638        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
639    ) -> Result<()> {
640        let current_state = self.manifest_ctx.current_state();
641        ensure!(
642            current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
643            RegionStateSnafu {
644                region_id: self.region_id,
645                state: current_state,
646                expect: RegionRoleState::Leader(RegionLeaderState::Staging),
647            }
648        );
649
650        // Merge all staged manifest actions
651        let merged_actions = match manager.merge_staged_actions(current_state).await? {
652            Some(actions) => actions,
653            None => {
654                info!(
655                    "No staged manifests to merge for region {}, exiting staging mode without changes",
656                    self.region_id
657                );
658                // Even if no manifests to merge, we still need to exit staging mode
659                self.exit_staging()?;
660                return Ok(());
661            }
662        };
663
664        // Submit merged actions using the manifest manager's update method
665        // Pass the target state (Writable) so it saves to normal directory, not staging
666        let target_state = RegionRoleState::Leader(RegionLeaderState::Writable);
667        let new_version = manager.update(merged_actions.clone(), target_state).await?;
668
669        info!(
670            "Successfully submitted merged staged manifests for region {}, new version: {}",
671            self.region_id, new_version
672        );
673
674        // Apply the merged changes to in-memory version control
675        let merged_edit = merged_actions.into_region_edit();
676        self.version_control
677            .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
678
679        // Clear all staging manifests and transit state
680        manager.store().clear_staging_manifests().await?;
681        self.exit_staging()?;
682
683        Ok(())
684    }
685}
686
687/// Context to update the region manifest.
688#[derive(Debug)]
689pub(crate) struct ManifestContext {
690    /// Manager to maintain manifest for this region.
691    pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
692    /// The state of the region. The region checks the state before updating
693    /// manifest.
694    state: AtomicCell<RegionRoleState>,
695}
696
697impl ManifestContext {
698    pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
699        ManifestContext {
700            manifest_manager: tokio::sync::RwLock::new(manager),
701            state: AtomicCell::new(state),
702        }
703    }
704
705    pub(crate) async fn manifest_version(&self) -> ManifestVersion {
706        self.manifest_manager
707            .read()
708            .await
709            .manifest()
710            .manifest_version
711    }
712
713    pub(crate) async fn has_update(&self) -> Result<bool> {
714        self.manifest_manager.read().await.has_update().await
715    }
716
717    /// Returns the current region role state.
718    pub(crate) fn current_state(&self) -> RegionRoleState {
719        self.state.load()
720    }
721
722    /// Installs the manifest changes from the current version to the target version (inclusive).
723    ///
724    /// Returns installed [RegionManifest].
725    /// **Note**: This function is not guaranteed to install the target version strictly.
726    /// The installed version may be greater than the target version.
727    pub(crate) async fn install_manifest_to(
728        &self,
729        version: ManifestVersion,
730    ) -> Result<Arc<RegionManifest>> {
731        let mut manager = self.manifest_manager.write().await;
732        manager.install_manifest_to(version).await?;
733
734        Ok(manager.manifest())
735    }
736
737    /// Updates the manifest if current state is `expect_state`.
738    pub(crate) async fn update_manifest(
739        &self,
740        expect_state: RegionLeaderState,
741        action_list: RegionMetaActionList,
742    ) -> Result<ManifestVersion> {
743        // Acquires the write lock of the manifest manager.
744        let mut manager = self.manifest_manager.write().await;
745        // Gets current manifest.
746        let manifest = manager.manifest();
747        // Checks state inside the lock. This is to ensure that we won't update the manifest
748        // after `set_readonly_gracefully()` is called.
749        let current_state = self.state.load();
750
751        // If expect_state is not downgrading, the current state must be either `expect_state` or downgrading.
752        //
753        // A downgrading leader rejects user writes but still allows
754        // flushing the memtable and updating the manifest.
755        if expect_state != RegionLeaderState::Downgrading {
756            if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
757                info!(
758                    "Region {} is in downgrading leader state, updating manifest. state is {:?}",
759                    manifest.metadata.region_id, expect_state
760                );
761            }
762            ensure!(
763                current_state == RegionRoleState::Leader(expect_state)
764                    || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
765                UpdateManifestSnafu {
766                    region_id: manifest.metadata.region_id,
767                    state: current_state,
768                }
769            );
770        } else {
771            ensure!(
772                current_state == RegionRoleState::Leader(expect_state),
773                RegionStateSnafu {
774                    region_id: manifest.metadata.region_id,
775                    state: current_state,
776                    expect: RegionRoleState::Leader(expect_state),
777                }
778            );
779        }
780
781        for action in &action_list.actions {
782            // Checks whether the edit is still applicable.
783            let RegionMetaAction::Edit(edit) = &action else {
784                continue;
785            };
786
787            // Checks whether the region is truncated.
788            let Some(truncated_entry_id) = manifest.truncated_entry_id else {
789                continue;
790            };
791
792            // This is an edit from flush.
793            if let Some(flushed_entry_id) = edit.flushed_entry_id {
794                ensure!(
795                    truncated_entry_id < flushed_entry_id,
796                    RegionTruncatedSnafu {
797                        region_id: manifest.metadata.region_id,
798                    }
799                );
800            }
801
802            // This is an edit from compaction.
803            if !edit.files_to_remove.is_empty() {
804                // Input files of the compaction task has been truncated.
805                for file in &edit.files_to_remove {
806                    ensure!(
807                        manifest.files.contains_key(&file.file_id),
808                        RegionTruncatedSnafu {
809                            region_id: manifest.metadata.region_id,
810                        }
811                    );
812                }
813            }
814        }
815
816        // Now we can update the manifest.
817        let version = manager.update(action_list, current_state).await.inspect_err(
818            |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
819        )?;
820
821        if self.state.load() == RegionRoleState::Follower {
822            warn!(
823                "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
824                manifest.metadata.region_id
825            );
826        }
827
828        Ok(version)
829    }
830
831    /// Sets the [`RegionRole`].
832    ///
833    /// ```text
834    ///     +------------------------------------------+
835    ///     |                      +-----------------+ |
836    ///     |                      |                 | |
837    /// +---+------+       +-------+-----+        +--v-v---+
838    /// | Follower |       | Downgrading |        | Leader |
839    /// +---^-^----+       +-----+-^-----+        +--+-+---+
840    ///     | |                  | |                 | |
841    ///     | +------------------+ +-----------------+ |
842    ///     +------------------------------------------+
843    ///
844    /// Transition:
845    /// - Follower -> Leader
846    /// - Downgrading Leader -> Leader
847    /// - Leader -> Follower
848    /// - Downgrading Leader -> Follower
849    /// - Leader -> Downgrading Leader
850    ///
851    /// ```
852    pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
853        match next_role {
854            RegionRole::Follower => {
855                match self.state.fetch_update(|state| {
856                    if !matches!(state, RegionRoleState::Follower) {
857                        Some(RegionRoleState::Follower)
858                    } else {
859                        None
860                    }
861                }) {
862                    Ok(state) => info!(
863                        "Convert region {} to follower, previous role state: {:?}",
864                        region_id, state
865                    ),
866                    Err(state) => {
867                        if state != RegionRoleState::Follower {
868                            warn!(
869                                "Failed to convert region {} to follower, current role state: {:?}",
870                                region_id, state
871                            )
872                        }
873                    }
874                }
875            }
876            RegionRole::Leader => {
877                match self.state.fetch_update(|state| {
878                    if matches!(
879                        state,
880                        RegionRoleState::Follower
881                            | RegionRoleState::Leader(RegionLeaderState::Downgrading)
882                    ) {
883                        Some(RegionRoleState::Leader(RegionLeaderState::Writable))
884                    } else {
885                        None
886                    }
887                }) {
888                    Ok(state) => info!(
889                        "Convert region {} to leader, previous role state: {:?}",
890                        region_id, state
891                    ),
892                    Err(state) => {
893                        if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
894                            warn!(
895                                "Failed to convert region {} to leader, current role state: {:?}",
896                                region_id, state
897                            )
898                        }
899                    }
900                }
901            }
902            RegionRole::DowngradingLeader => {
903                match self.state.compare_exchange(
904                    RegionRoleState::Leader(RegionLeaderState::Writable),
905                    RegionRoleState::Leader(RegionLeaderState::Downgrading),
906                ) {
907                    Ok(state) => info!(
908                        "Convert region {} to downgrading region, previous role state: {:?}",
909                        region_id, state
910                    ),
911                    Err(state) => {
912                        if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
913                            warn!(
914                                "Failed to convert region {} to downgrading leader, current role state: {:?}",
915                                region_id, state
916                            )
917                        }
918                    }
919                }
920            }
921        }
922    }
923
924    pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
925        self.manifest_manager.read().await.manifest()
926    }
927}
928
929pub(crate) type ManifestContextRef = Arc<ManifestContext>;
930
931/// Regions indexed by ids.
932#[derive(Debug, Default)]
933pub(crate) struct RegionMap {
934    regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
935}
936
937impl RegionMap {
938    /// Returns true if the region exists.
939    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
940        let regions = self.regions.read().unwrap();
941        regions.contains_key(&region_id)
942    }
943
944    /// Inserts a new region into the map.
945    pub(crate) fn insert_region(&self, region: MitoRegionRef) {
946        let mut regions = self.regions.write().unwrap();
947        regions.insert(region.region_id, region);
948    }
949
950    /// Gets region by region id.
951    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
952        let regions = self.regions.read().unwrap();
953        regions.get(&region_id).cloned()
954    }
955
956    /// Gets writable region by region id.
957    ///
958    /// Returns error if the region does not exist or is readonly.
959    pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
960        let region = self
961            .get_region(region_id)
962            .context(RegionNotFoundSnafu { region_id })?;
963        ensure!(
964            region.is_writable(),
965            RegionStateSnafu {
966                region_id,
967                state: region.state(),
968                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
969            }
970        );
971        Ok(region)
972    }
973
974    /// Gets readonly region by region id.
975    ///
976    /// Returns error if the region does not exist or is writable.
977    pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
978        let region = self
979            .get_region(region_id)
980            .context(RegionNotFoundSnafu { region_id })?;
981        ensure!(
982            region.is_follower(),
983            RegionStateSnafu {
984                region_id,
985                state: region.state(),
986                expect: RegionRoleState::Follower,
987            }
988        );
989
990        Ok(region)
991    }
992
993    /// Gets region by region id.
994    ///
995    /// Calls the callback if the region does not exist.
996    pub(crate) fn get_region_or<F: OnFailure>(
997        &self,
998        region_id: RegionId,
999        cb: &mut F,
1000    ) -> Option<MitoRegionRef> {
1001        match self
1002            .get_region(region_id)
1003            .context(RegionNotFoundSnafu { region_id })
1004        {
1005            Ok(region) => Some(region),
1006            Err(e) => {
1007                cb.on_failure(e);
1008                None
1009            }
1010        }
1011    }
1012
1013    /// Gets writable region by region id.
1014    ///
1015    /// Calls the callback if the region does not exist or is readonly.
1016    pub(crate) fn writable_region_or<F: OnFailure>(
1017        &self,
1018        region_id: RegionId,
1019        cb: &mut F,
1020    ) -> Option<MitoRegionRef> {
1021        match self.writable_region(region_id) {
1022            Ok(region) => Some(region),
1023            Err(e) => {
1024                cb.on_failure(e);
1025                None
1026            }
1027        }
1028    }
1029
1030    /// Gets writable non-staging region by region id.
1031    ///
1032    /// Returns error if the region does not exist, is readonly, or is in staging mode.
1033    pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1034        let region = self.writable_region(region_id)?;
1035        if region.is_staging() {
1036            return Err(crate::error::RegionStateSnafu {
1037                region_id,
1038                state: region.state(),
1039                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1040            }
1041            .build());
1042        }
1043        Ok(region)
1044    }
1045
1046    /// Gets flushable region by region id.
1047    ///
1048    /// Returns error if the region does not exist or is not operable.
1049    fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1050        let region = self
1051            .get_region(region_id)
1052            .context(RegionNotFoundSnafu { region_id })?;
1053        ensure!(
1054            region.is_flushable(),
1055            FlushableRegionStateSnafu {
1056                region_id,
1057                state: region.state(),
1058            }
1059        );
1060        Ok(region)
1061    }
1062
1063    /// Gets flushable region by region id.
1064    ///
1065    /// Calls the callback if the region does not exist or is not operable.
1066    pub(crate) fn flushable_region_or<F: OnFailure>(
1067        &self,
1068        region_id: RegionId,
1069        cb: &mut F,
1070    ) -> Option<MitoRegionRef> {
1071        match self.flushable_region(region_id) {
1072            Ok(region) => Some(region),
1073            Err(e) => {
1074                cb.on_failure(e);
1075                None
1076            }
1077        }
1078    }
1079
1080    /// Remove region by id.
1081    pub(crate) fn remove_region(&self, region_id: RegionId) {
1082        let mut regions = self.regions.write().unwrap();
1083        regions.remove(&region_id);
1084    }
1085
1086    /// List all regions.
1087    pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1088        let regions = self.regions.read().unwrap();
1089        regions.values().cloned().collect()
1090    }
1091
1092    /// Clear the map.
1093    pub(crate) fn clear(&self) {
1094        self.regions.write().unwrap().clear();
1095    }
1096}
1097
1098pub(crate) type RegionMapRef = Arc<RegionMap>;
1099
1100/// Opening regions
1101#[derive(Debug, Default)]
1102pub(crate) struct OpeningRegions {
1103    regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1104}
1105
1106impl OpeningRegions {
1107    /// Registers `sender` for an opening region; Otherwise, it returns `None`.
1108    pub(crate) fn wait_for_opening_region(
1109        &self,
1110        region_id: RegionId,
1111        sender: OptionOutputTx,
1112    ) -> Option<OptionOutputTx> {
1113        let mut regions = self.regions.write().unwrap();
1114        match regions.entry(region_id) {
1115            Entry::Occupied(mut senders) => {
1116                senders.get_mut().push(sender);
1117                None
1118            }
1119            Entry::Vacant(_) => Some(sender),
1120        }
1121    }
1122
1123    /// Returns true if the region exists.
1124    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1125        let regions = self.regions.read().unwrap();
1126        regions.contains_key(&region_id)
1127    }
1128
1129    /// Inserts a new region into the map.
1130    pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1131        let mut regions = self.regions.write().unwrap();
1132        regions.insert(region, vec![sender]);
1133    }
1134
1135    /// Remove region by id.
1136    pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1137        let mut regions = self.regions.write().unwrap();
1138        regions.remove(&region_id).unwrap_or_default()
1139    }
1140
1141    #[cfg(test)]
1142    pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1143        let regions = self.regions.read().unwrap();
1144        if let Some(senders) = regions.get(&region_id) {
1145            senders.len()
1146        } else {
1147            0
1148        }
1149    }
1150}
1151
1152pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1153
1154/// The regions that are catching up.
1155#[derive(Debug, Default)]
1156pub(crate) struct CatchupRegions {
1157    regions: RwLock<HashSet<RegionId>>,
1158}
1159
1160impl CatchupRegions {
1161    /// Returns true if the region exists.
1162    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1163        let regions = self.regions.read().unwrap();
1164        regions.contains(&region_id)
1165    }
1166
1167    /// Inserts a new region into the set.
1168    pub(crate) fn insert_region(&self, region_id: RegionId) {
1169        let mut regions = self.regions.write().unwrap();
1170        regions.insert(region_id);
1171    }
1172
1173    /// Remove region by id.
1174    pub(crate) fn remove_region(&self, region_id: RegionId) {
1175        let mut regions = self.regions.write().unwrap();
1176        regions.remove(&region_id);
1177    }
1178}
1179
1180pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1181
1182/// Manifest stats.
1183#[derive(Default, Debug, Clone)]
1184pub(crate) struct ManifestStats {
1185    total_manifest_size: Arc<AtomicU64>,
1186    manifest_version: Arc<AtomicU64>,
1187}
1188
1189impl ManifestStats {
1190    fn total_manifest_size(&self) -> u64 {
1191        self.total_manifest_size.load(Ordering::Relaxed)
1192    }
1193
1194    fn manifest_version(&self) -> u64 {
1195        self.manifest_version.load(Ordering::Relaxed)
1196    }
1197}
1198
1199#[cfg(test)]
1200mod tests {
1201    use std::sync::Arc;
1202    use std::sync::atomic::AtomicU64;
1203
1204    use common_datasource::compression::CompressionType;
1205    use common_test_util::temp_dir::create_temp_dir;
1206    use crossbeam_utils::atomic::AtomicCell;
1207    use object_store::ObjectStore;
1208    use object_store::services::Fs;
1209    use store_api::logstore::provider::Provider;
1210    use store_api::region_engine::RegionRole;
1211    use store_api::region_request::PathType;
1212    use store_api::storage::RegionId;
1213
1214    use crate::access_layer::AccessLayer;
1215    use crate::manifest::action::RegionMetaActionList;
1216    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1217    use crate::region::{
1218        ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1219    };
1220    use crate::sst::FormatType;
1221    use crate::sst::index::intermediate::IntermediateManager;
1222    use crate::sst::index::puffin_manager::PuffinManagerFactory;
1223    use crate::test_util::memtable_util::EmptyMemtableBuilder;
1224    use crate::test_util::scheduler_util::SchedulerEnv;
1225    use crate::test_util::version_util::VersionControlBuilder;
1226    use crate::time_provider::StdTimeProvider;
1227
1228    #[test]
1229    fn test_region_state_lock_free() {
1230        assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1231    }
1232
1233    #[tokio::test]
1234    async fn test_set_region_state() {
1235        let env = SchedulerEnv::new().await;
1236        let builder = VersionControlBuilder::new();
1237        let version_control = Arc::new(builder.build());
1238        let manifest_ctx = env
1239            .mock_manifest_context(version_control.current().version.metadata.clone())
1240            .await;
1241
1242        let region_id = RegionId::new(1024, 0);
1243        // Leader -> Follower
1244        manifest_ctx.set_role(RegionRole::Follower, region_id);
1245        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1246
1247        // Follower -> Leader
1248        manifest_ctx.set_role(RegionRole::Leader, region_id);
1249        assert_eq!(
1250            manifest_ctx.state.load(),
1251            RegionRoleState::Leader(RegionLeaderState::Writable)
1252        );
1253
1254        // Leader -> Downgrading Leader
1255        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1256        assert_eq!(
1257            manifest_ctx.state.load(),
1258            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1259        );
1260
1261        // Downgrading Leader -> Follower
1262        manifest_ctx.set_role(RegionRole::Follower, region_id);
1263        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1264
1265        // Can't downgrade from follower (Follower -> Downgrading Leader)
1266        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1267        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1268
1269        // Set region role too Downgrading Leader
1270        manifest_ctx.set_role(RegionRole::Leader, region_id);
1271        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1272        assert_eq!(
1273            manifest_ctx.state.load(),
1274            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1275        );
1276
1277        // Downgrading Leader -> Leader
1278        manifest_ctx.set_role(RegionRole::Leader, region_id);
1279        assert_eq!(
1280            manifest_ctx.state.load(),
1281            RegionRoleState::Leader(RegionLeaderState::Writable)
1282        );
1283    }
1284
1285    #[tokio::test]
1286    async fn test_staging_state_validation() {
1287        let env = SchedulerEnv::new().await;
1288        let builder = VersionControlBuilder::new();
1289        let version_control = Arc::new(builder.build());
1290
1291        // Create context with staging state using the correct pattern from SchedulerEnv
1292        let staging_ctx = {
1293            let manager = RegionManifestManager::new(
1294                version_control.current().version.metadata.clone(),
1295                0,
1296                RegionManifestOptions {
1297                    manifest_dir: "".to_string(),
1298                    object_store: env.access_layer.object_store().clone(),
1299                    compress_type: CompressionType::Uncompressed,
1300                    checkpoint_distance: 10,
1301                    remove_file_options: Default::default(),
1302                },
1303                Default::default(),
1304                Default::default(),
1305                FormatType::PrimaryKey,
1306            )
1307            .await
1308            .unwrap();
1309            Arc::new(ManifestContext::new(
1310                manager,
1311                RegionRoleState::Leader(RegionLeaderState::Staging),
1312            ))
1313        };
1314
1315        // Test staging state behavior
1316        assert_eq!(
1317            staging_ctx.current_state(),
1318            RegionRoleState::Leader(RegionLeaderState::Staging)
1319        );
1320
1321        // Test writable context for comparison
1322        let writable_ctx = env
1323            .mock_manifest_context(version_control.current().version.metadata.clone())
1324            .await;
1325
1326        assert_eq!(
1327            writable_ctx.current_state(),
1328            RegionRoleState::Leader(RegionLeaderState::Writable)
1329        );
1330    }
1331
1332    #[tokio::test]
1333    async fn test_staging_state_transitions() {
1334        let builder = VersionControlBuilder::new();
1335        let version_control = Arc::new(builder.build());
1336        let metadata = version_control.current().version.metadata.clone();
1337
1338        // Create MitoRegion for testing state transitions
1339        let temp_dir = create_temp_dir("");
1340        let path_str = temp_dir.path().display().to_string();
1341        let fs_builder = Fs::default().root(&path_str);
1342        let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1343
1344        let index_aux_path = temp_dir.path().join("index_aux");
1345        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1346            .await
1347            .unwrap();
1348        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1349            .await
1350            .unwrap();
1351
1352        let access_layer = Arc::new(AccessLayer::new(
1353            "",
1354            PathType::Bare,
1355            object_store,
1356            puffin_mgr,
1357            intm_mgr,
1358        ));
1359
1360        let manager = RegionManifestManager::new(
1361            metadata.clone(),
1362            0,
1363            RegionManifestOptions {
1364                manifest_dir: "".to_string(),
1365                object_store: access_layer.object_store().clone(),
1366                compress_type: CompressionType::Uncompressed,
1367                checkpoint_distance: 10,
1368                remove_file_options: Default::default(),
1369            },
1370            Default::default(),
1371            Default::default(),
1372            FormatType::PrimaryKey,
1373        )
1374        .await
1375        .unwrap();
1376
1377        let manifest_ctx = Arc::new(ManifestContext::new(
1378            manager,
1379            RegionRoleState::Leader(RegionLeaderState::Writable),
1380        ));
1381
1382        let region = MitoRegion {
1383            region_id: metadata.region_id,
1384            version_control,
1385            access_layer,
1386            manifest_ctx: manifest_ctx.clone(),
1387            file_purger: crate::test_util::new_noop_file_purger(),
1388            provider: Provider::noop_provider(),
1389            last_flush_millis: Default::default(),
1390            last_compaction_millis: Default::default(),
1391            time_provider: Arc::new(StdTimeProvider),
1392            topic_latest_entry_id: Default::default(),
1393            written_bytes: Arc::new(AtomicU64::new(0)),
1394            memtable_builder: Arc::new(EmptyMemtableBuilder::default()),
1395            sst_format: FormatType::PrimaryKey,
1396            stats: ManifestStats::default(),
1397        };
1398
1399        // Test initial state
1400        assert_eq!(
1401            region.state(),
1402            RegionRoleState::Leader(RegionLeaderState::Writable)
1403        );
1404        assert!(!region.is_staging());
1405
1406        // Test transition to staging
1407        let mut manager = manifest_ctx.manifest_manager.write().await;
1408        region.set_staging(&mut manager).await.unwrap();
1409        drop(manager);
1410        assert_eq!(
1411            region.state(),
1412            RegionRoleState::Leader(RegionLeaderState::Staging)
1413        );
1414        assert!(region.is_staging());
1415
1416        // Test transition back to writable
1417        region.exit_staging().unwrap();
1418        assert_eq!(
1419            region.state(),
1420            RegionRoleState::Leader(RegionLeaderState::Writable)
1421        );
1422        assert!(!region.is_staging());
1423
1424        // Test staging directory cleanup: Create dirty staging files before entering staging mode
1425        {
1426            // Create some dummy staging manifest files to simulate interrupted session
1427            let manager = manifest_ctx.manifest_manager.write().await;
1428            let dummy_actions = RegionMetaActionList::new(vec![]);
1429            let dummy_bytes = dummy_actions.encode().unwrap();
1430
1431            // Create dirty staging files with versions 100 and 101
1432            manager.store().save(100, &dummy_bytes, true).await.unwrap();
1433            manager.store().save(101, &dummy_bytes, true).await.unwrap();
1434            drop(manager);
1435
1436            // Verify dirty files exist before entering staging
1437            let manager = manifest_ctx.manifest_manager.read().await;
1438            let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1439            assert_eq!(
1440                dirty_manifests.len(),
1441                2,
1442                "Should have 2 dirty staging files"
1443            );
1444            drop(manager);
1445
1446            // Enter staging mode - this should clean up the dirty files
1447            let mut manager = manifest_ctx.manifest_manager.write().await;
1448            region.set_staging(&mut manager).await.unwrap();
1449            drop(manager);
1450
1451            // Verify dirty files are cleaned up after entering staging
1452            let manager = manifest_ctx.manifest_manager.read().await;
1453            let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1454            assert_eq!(
1455                cleaned_manifests.len(),
1456                0,
1457                "Dirty staging files should be cleaned up"
1458            );
1459            drop(manager);
1460
1461            // Exit staging to restore normal state for remaining tests
1462            region.exit_staging().unwrap();
1463        }
1464
1465        // Test invalid transitions
1466        let mut manager = manifest_ctx.manifest_manager.write().await;
1467        assert!(region.set_staging(&mut manager).await.is_ok()); // Writable -> Staging should work
1468        drop(manager);
1469        let mut manager = manifest_ctx.manifest_manager.write().await;
1470        assert!(region.set_staging(&mut manager).await.is_err()); // Staging -> Staging should fail
1471        drop(manager);
1472        assert!(region.exit_staging().is_ok()); // Staging -> Writable should work
1473        assert!(region.exit_staging().is_err()); // Writable -> Writable should fail
1474    }
1475}