mito2/
region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region.
16
17pub mod catchup;
18pub mod opener;
19pub mod options;
20pub(crate) mod version;
21
22use std::collections::hash_map::Entry;
23use std::collections::{HashMap, HashSet};
24use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
25use std::sync::{Arc, RwLock};
26
27use common_telemetry::{error, info, warn};
28use crossbeam_utils::atomic::AtomicCell;
29use snafu::{OptionExt, ensure};
30use store_api::ManifestVersion;
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::logstore::provider::Provider;
33use store_api::metadata::RegionMetadataRef;
34use store_api::region_engine::{
35    RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
36};
37use store_api::sst_entry::ManifestSstEntry;
38use store_api::storage::{RegionId, SequenceNumber};
39use tokio::sync::RwLockWriteGuard;
40
41use crate::access_layer::AccessLayerRef;
42use crate::error::{
43    FlushableRegionStateSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
44    UpdateManifestSnafu,
45};
46use crate::manifest::action::{
47    RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
48};
49use crate::manifest::manager::RegionManifestManager;
50use crate::region::version::{VersionControlRef, VersionRef};
51use crate::request::{OnFailure, OptionOutputTx};
52use crate::sst::file_purger::FilePurgerRef;
53use crate::sst::location::{index_file_path, sst_file_path};
54use crate::time_provider::TimeProviderRef;
55
56/// This is the approximate factor to estimate the size of wal.
57const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
58
59/// Region status include region id, memtable usage, sst usage, wal usage and manifest usage.
60#[derive(Debug)]
61pub struct RegionUsage {
62    pub region_id: RegionId,
63    pub wal_usage: u64,
64    pub sst_usage: u64,
65    pub manifest_usage: u64,
66}
67
68impl RegionUsage {
69    pub fn disk_usage(&self) -> u64 {
70        self.wal_usage + self.sst_usage + self.manifest_usage
71    }
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum RegionLeaderState {
76    /// The region is opened and is writable.
77    Writable,
78    /// The region is in staging mode - writable but no checkpoint/compaction.
79    Staging,
80    /// The region is altering.
81    Altering,
82    /// The region is dropping.
83    Dropping,
84    /// The region is truncating.
85    Truncating,
86    /// The region is handling a region edit.
87    Editing,
88    /// The region is stepping down.
89    Downgrading,
90}
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93pub enum RegionRoleState {
94    Leader(RegionLeaderState),
95    Follower,
96}
97
98/// Metadata and runtime status of a region.
99///
100/// Writing and reading a region follow a single-writer-multi-reader rule:
101/// - Only the region worker thread this region belongs to can modify the metadata.
102/// - Multiple reader threads are allowed to read a specific `version` of a region.
103#[derive(Debug)]
104pub struct MitoRegion {
105    /// Id of this region.
106    ///
107    /// Accessing region id from the version control is inconvenient so
108    /// we also store it here.
109    pub(crate) region_id: RegionId,
110
111    /// Version controller for this region.
112    ///
113    /// We MUST update the version control inside the write lock of the region manifest manager.
114    pub(crate) version_control: VersionControlRef,
115    /// SSTs accessor for this region.
116    pub(crate) access_layer: AccessLayerRef,
117    /// Context to maintain manifest for this region.
118    pub(crate) manifest_ctx: ManifestContextRef,
119    /// SST file purger.
120    pub(crate) file_purger: FilePurgerRef,
121    /// The provider of log store.
122    pub(crate) provider: Provider,
123    /// Last flush time in millis.
124    last_flush_millis: AtomicI64,
125    /// Last compaction time in millis.
126    last_compaction_millis: AtomicI64,
127    /// Provider to get current time.
128    time_provider: TimeProviderRef,
129    /// The topic's latest entry id since the region's last flushing.
130    /// **Only used for remote WAL pruning.**
131    ///
132    /// The value will be updated to the latest offset of the topic
133    /// if region receives a flush request or schedules a periodic flush task
134    /// and the region's memtable is empty.
135    ///
136    /// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region,
137    /// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`.
138    pub(crate) topic_latest_entry_id: AtomicU64,
139    /// The total bytes written to the region.
140    pub(crate) written_bytes: Arc<AtomicU64>,
141    /// manifest stats
142    stats: ManifestStats,
143}
144
145pub type MitoRegionRef = Arc<MitoRegion>;
146
147impl MitoRegion {
148    /// Stop background managers for this region.
149    pub(crate) async fn stop(&self) {
150        self.manifest_ctx
151            .manifest_manager
152            .write()
153            .await
154            .stop()
155            .await;
156
157        info!(
158            "Stopped region manifest manager, region_id: {}",
159            self.region_id
160        );
161    }
162
163    /// Returns current metadata of the region.
164    pub(crate) fn metadata(&self) -> RegionMetadataRef {
165        let version_data = self.version_control.current();
166        version_data.version.metadata.clone()
167    }
168
169    /// Returns primary key encoding of the region.
170    pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
171        let version_data = self.version_control.current();
172        version_data.version.metadata.primary_key_encoding
173    }
174
175    /// Returns current version of the region.
176    pub(crate) fn version(&self) -> VersionRef {
177        let version_data = self.version_control.current();
178        version_data.version
179    }
180
181    /// Returns last flush timestamp in millis.
182    pub(crate) fn last_flush_millis(&self) -> i64 {
183        self.last_flush_millis.load(Ordering::Relaxed)
184    }
185
186    /// Update flush time to current time.
187    pub(crate) fn update_flush_millis(&self) {
188        let now = self.time_provider.current_time_millis();
189        self.last_flush_millis.store(now, Ordering::Relaxed);
190    }
191
192    /// Returns last compaction timestamp in millis.
193    pub(crate) fn last_compaction_millis(&self) -> i64 {
194        self.last_compaction_millis.load(Ordering::Relaxed)
195    }
196
197    /// Update compaction time to current time.
198    pub(crate) fn update_compaction_millis(&self) {
199        let now = self.time_provider.current_time_millis();
200        self.last_compaction_millis.store(now, Ordering::Relaxed);
201    }
202
203    /// Returns the table dir.
204    pub(crate) fn table_dir(&self) -> &str {
205        self.access_layer.table_dir()
206    }
207
208    /// Returns whether the region is writable.
209    pub(crate) fn is_writable(&self) -> bool {
210        matches!(
211            self.manifest_ctx.state.load(),
212            RegionRoleState::Leader(RegionLeaderState::Writable)
213                | RegionRoleState::Leader(RegionLeaderState::Staging)
214        )
215    }
216
217    /// Returns whether the region is flushable.
218    pub(crate) fn is_flushable(&self) -> bool {
219        matches!(
220            self.manifest_ctx.state.load(),
221            RegionRoleState::Leader(RegionLeaderState::Writable)
222                | RegionRoleState::Leader(RegionLeaderState::Staging)
223                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
224        )
225    }
226
227    /// Returns whether the region should abort index building.
228    pub(crate) fn should_abort_index(&self) -> bool {
229        matches!(
230            self.manifest_ctx.state.load(),
231            RegionRoleState::Follower
232                | RegionRoleState::Leader(RegionLeaderState::Dropping)
233                | RegionRoleState::Leader(RegionLeaderState::Truncating)
234                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
235                | RegionRoleState::Leader(RegionLeaderState::Staging)
236        )
237    }
238
239    /// Returns whether the region is downgrading.
240    pub(crate) fn is_downgrading(&self) -> bool {
241        matches!(
242            self.manifest_ctx.state.load(),
243            RegionRoleState::Leader(RegionLeaderState::Downgrading)
244        )
245    }
246
247    /// Returns whether the region is in staging mode.
248    #[allow(dead_code)]
249    pub(crate) fn is_staging(&self) -> bool {
250        self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
251    }
252
253    pub fn region_id(&self) -> RegionId {
254        self.region_id
255    }
256
257    pub fn find_committed_sequence(&self) -> SequenceNumber {
258        self.version_control.committed_sequence()
259    }
260
261    /// Returns whether the region is readonly.
262    pub fn is_follower(&self) -> bool {
263        self.manifest_ctx.state.load() == RegionRoleState::Follower
264    }
265
266    /// Returns the state of the region.
267    pub(crate) fn state(&self) -> RegionRoleState {
268        self.manifest_ctx.state.load()
269    }
270
271    /// Sets the region role state.
272    pub(crate) fn set_role(&self, next_role: RegionRole) {
273        self.manifest_ctx.set_role(next_role, self.region_id);
274    }
275
276    /// Sets the altering state.
277    /// You should call this method in the worker loop.
278    pub(crate) fn set_altering(&self) -> Result<()> {
279        self.compare_exchange_state(
280            RegionLeaderState::Writable,
281            RegionRoleState::Leader(RegionLeaderState::Altering),
282        )
283    }
284
285    /// Sets the dropping state.
286    /// You should call this method in the worker loop.
287    pub(crate) fn set_dropping(&self) -> Result<()> {
288        self.compare_exchange_state(
289            RegionLeaderState::Writable,
290            RegionRoleState::Leader(RegionLeaderState::Dropping),
291        )
292    }
293
294    /// Sets the truncating state.
295    /// You should call this method in the worker loop.
296    pub(crate) fn set_truncating(&self) -> Result<()> {
297        self.compare_exchange_state(
298            RegionLeaderState::Writable,
299            RegionRoleState::Leader(RegionLeaderState::Truncating),
300        )
301    }
302
303    /// Sets the editing state.
304    /// You should call this method in the worker loop.
305    pub(crate) fn set_editing(&self) -> Result<()> {
306        self.compare_exchange_state(
307            RegionLeaderState::Writable,
308            RegionRoleState::Leader(RegionLeaderState::Editing),
309        )
310    }
311
312    /// Sets the staging state.
313    ///
314    /// You should call this method in the worker loop.
315    /// Transitions from Writable to Staging state.
316    /// Cleans any existing staging manifests before entering staging mode.
317    pub(crate) async fn set_staging(
318        &self,
319        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
320    ) -> Result<()> {
321        manager.store().clear_staging_manifests().await?;
322
323        self.compare_exchange_state(
324            RegionLeaderState::Writable,
325            RegionRoleState::Leader(RegionLeaderState::Staging),
326        )
327    }
328
329    /// Exits the staging state back to writable.
330    ///
331    /// You should call this method in the worker loop.
332    /// Transitions from Staging to Writable state.
333    fn exit_staging(&self) -> Result<()> {
334        self.compare_exchange_state(
335            RegionLeaderState::Staging,
336            RegionRoleState::Leader(RegionLeaderState::Writable),
337        )
338    }
339
340    /// Sets the region role state gracefully. This acquires the manifest write lock.
341    pub(crate) async fn set_role_state_gracefully(
342        &self,
343        state: SettableRegionRoleState,
344    ) -> Result<()> {
345        let mut manager = self.manifest_ctx.manifest_manager.write().await;
346        let current_state = self.state();
347
348        match state {
349            SettableRegionRoleState::Leader => {
350                // Exit staging mode and return to normal writable leader
351                // Only allowed from staging state
352                match current_state {
353                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
354                        info!("Exiting staging mode for region {}", self.region_id);
355                        // Use the success exit path that merges all staged manifests
356                        self.exit_staging_on_success(&mut manager).await?;
357                    }
358                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
359                        // Already in desired state - no-op
360                        info!("Region {} already in normal leader mode", self.region_id);
361                    }
362                    _ => {
363                        // Only staging -> leader transition is allowed
364                        return Err(RegionStateSnafu {
365                            region_id: self.region_id,
366                            state: current_state,
367                            expect: RegionRoleState::Leader(RegionLeaderState::Staging),
368                        }
369                        .build());
370                    }
371                }
372            }
373
374            SettableRegionRoleState::StagingLeader => {
375                // Enter staging mode from normal writable leader
376                // Only allowed from writable leader state
377                match current_state {
378                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
379                        info!("Entering staging mode for region {}", self.region_id);
380                        self.set_staging(&mut manager).await?;
381                    }
382                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
383                        // Already in desired state - no-op
384                        info!("Region {} already in staging mode", self.region_id);
385                    }
386                    _ => {
387                        return Err(RegionStateSnafu {
388                            region_id: self.region_id,
389                            state: current_state,
390                            expect: RegionRoleState::Leader(RegionLeaderState::Writable),
391                        }
392                        .build());
393                    }
394                }
395            }
396
397            SettableRegionRoleState::Follower => {
398                // Make this region a follower
399                match current_state {
400                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
401                        info!(
402                            "Exiting staging and demoting region {} to follower",
403                            self.region_id
404                        );
405                        self.exit_staging()?;
406                        self.set_role(RegionRole::Follower);
407                    }
408                    RegionRoleState::Leader(_) => {
409                        info!("Demoting region {} from leader to follower", self.region_id);
410                        self.set_role(RegionRole::Follower);
411                    }
412                    RegionRoleState::Follower => {
413                        // Already in desired state - no-op
414                        info!("Region {} already in follower mode", self.region_id);
415                    }
416                }
417            }
418
419            SettableRegionRoleState::DowngradingLeader => {
420                // downgrade this region to downgrading leader
421                match current_state {
422                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
423                        info!(
424                            "Exiting staging and entering downgrade for region {}",
425                            self.region_id
426                        );
427                        self.exit_staging()?;
428                        self.set_role(RegionRole::DowngradingLeader);
429                    }
430                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
431                        info!("Starting downgrade for region {}", self.region_id);
432                        self.set_role(RegionRole::DowngradingLeader);
433                    }
434                    RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
435                        // Already in desired state - no-op
436                        info!("Region {} already in downgrading mode", self.region_id);
437                    }
438                    _ => {
439                        warn!(
440                            "Cannot start downgrade for region {} from state {:?}",
441                            self.region_id, current_state
442                        );
443                    }
444                }
445            }
446        }
447
448        // Hack(zhongzc): If we have just become leader (writable), persist any backfilled metadata.
449        if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
450            // Persist backfilled metadata if manifest is missing fields (e.g., partition_expr)
451            let manifest_meta = &manager.manifest().metadata;
452            let current_version = self.version();
453            let current_meta = &current_version.metadata;
454            if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
455                let action = RegionMetaAction::Change(RegionChange {
456                    metadata: current_meta.clone(),
457                    sst_format: current_version.options.sst_format.unwrap_or_default(),
458                });
459                let result = manager
460                    .update(
461                        RegionMetaActionList::with_action(action),
462                        RegionRoleState::Leader(RegionLeaderState::Writable),
463                    )
464                    .await;
465
466                match result {
467                    Ok(version) => {
468                        info!(
469                            "Successfully persisted backfilled metadata for region {}, version: {}",
470                            self.region_id, version
471                        );
472                    }
473                    Err(e) => {
474                        warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
475                    }
476                }
477            }
478        }
479
480        drop(manager);
481
482        Ok(())
483    }
484
485    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Writable)` if the current state is `expect`.
486    /// Otherwise, logs an error.
487    pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
488        if let Err(e) = self
489            .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
490        {
491            error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
492        }
493    }
494
495    /// Returns the region statistic.
496    pub(crate) fn region_statistic(&self) -> RegionStatistic {
497        let version = self.version();
498        let memtables = &version.memtables;
499        let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
500
501        let sst_usage = version.ssts.sst_usage();
502        let index_usage = version.ssts.index_usage();
503        let flushed_entry_id = version.flushed_entry_id;
504
505        let wal_usage = self.estimated_wal_usage(memtable_usage);
506        let manifest_usage = self.stats.total_manifest_size();
507        let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
508        let num_files = version.ssts.num_files();
509        let manifest_version = self.stats.manifest_version();
510        let file_removed_cnt = self.stats.file_removed_cnt();
511
512        let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
513        let written_bytes = self.written_bytes.load(Ordering::Relaxed);
514
515        RegionStatistic {
516            num_rows,
517            memtable_size: memtable_usage,
518            wal_size: wal_usage,
519            manifest_size: manifest_usage,
520            sst_size: sst_usage,
521            sst_num: num_files,
522            index_size: index_usage,
523            manifest: RegionManifestInfo::Mito {
524                manifest_version,
525                flushed_entry_id,
526                file_removed_cnt,
527            },
528            data_topic_latest_entry_id: topic_latest_entry_id,
529            metadata_topic_latest_entry_id: topic_latest_entry_id,
530            written_bytes,
531        }
532    }
533
534    /// Estimated WAL size in bytes.
535    /// Use the memtables size to estimate the size of wal.
536    fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
537        ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
538    }
539
540    /// Sets the state of the region to given state if the current state equals to
541    /// the expected.
542    fn compare_exchange_state(
543        &self,
544        expect: RegionLeaderState,
545        state: RegionRoleState,
546    ) -> Result<()> {
547        self.manifest_ctx
548            .state
549            .compare_exchange(RegionRoleState::Leader(expect), state)
550            .map_err(|actual| {
551                RegionStateSnafu {
552                    region_id: self.region_id,
553                    state: actual,
554                    expect: RegionRoleState::Leader(expect),
555                }
556                .build()
557            })?;
558        Ok(())
559    }
560
561    pub fn access_layer(&self) -> AccessLayerRef {
562        self.access_layer.clone()
563    }
564
565    /// Returns the SST entries of the region.
566    pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
567        let table_dir = self.table_dir();
568        let path_type = self.access_layer.path_type();
569
570        let visible_ssts = self
571            .version()
572            .ssts
573            .levels()
574            .iter()
575            .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
576            .collect::<HashSet<_>>();
577
578        self.manifest_ctx
579            .manifest()
580            .await
581            .files
582            .values()
583            .map(|meta| {
584                let region_id = self.region_id;
585                let origin_region_id = meta.region_id;
586                let (index_file_id, index_file_path, index_file_size) = if meta.index_file_size > 0
587                {
588                    let index_file_path =
589                        index_file_path(table_dir, meta.index_file_id(), path_type);
590                    (
591                        Some(meta.index_file_id().file_id().to_string()),
592                        Some(index_file_path),
593                        Some(meta.index_file_size),
594                    )
595                } else {
596                    (None, None, None)
597                };
598                let visible = visible_ssts.contains(&meta.file_id);
599                ManifestSstEntry {
600                    table_dir: table_dir.to_string(),
601                    region_id,
602                    table_id: region_id.table_id(),
603                    region_number: region_id.region_number(),
604                    region_group: region_id.region_group(),
605                    region_sequence: region_id.region_sequence(),
606                    file_id: meta.file_id.to_string(),
607                    index_file_id,
608                    level: meta.level,
609                    file_path: sst_file_path(table_dir, meta.file_id(), path_type),
610                    file_size: meta.file_size,
611                    index_file_path,
612                    index_file_size,
613                    num_rows: meta.num_rows,
614                    num_row_groups: meta.num_row_groups,
615                    num_series: Some(meta.num_series),
616                    min_ts: meta.time_range.0,
617                    max_ts: meta.time_range.1,
618                    sequence: meta.sequence.map(|s| s.get()),
619                    origin_region_id,
620                    node_id: None,
621                    visible,
622                }
623            })
624            .collect()
625    }
626
627    /// Exit staging mode successfully by merging all staged manifests and making them visible.
628    pub(crate) async fn exit_staging_on_success(
629        &self,
630        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
631    ) -> Result<()> {
632        let current_state = self.manifest_ctx.current_state();
633        ensure!(
634            current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
635            RegionStateSnafu {
636                region_id: self.region_id,
637                state: current_state,
638                expect: RegionRoleState::Leader(RegionLeaderState::Staging),
639            }
640        );
641
642        // Merge all staged manifest actions
643        let merged_actions = match manager.merge_staged_actions(current_state).await? {
644            Some(actions) => actions,
645            None => {
646                info!(
647                    "No staged manifests to merge for region {}, exiting staging mode without changes",
648                    self.region_id
649                );
650                // Even if no manifests to merge, we still need to exit staging mode
651                self.exit_staging()?;
652                return Ok(());
653            }
654        };
655
656        // Submit merged actions using the manifest manager's update method
657        // Pass the target state (Writable) so it saves to normal directory, not staging
658        let target_state = RegionRoleState::Leader(RegionLeaderState::Writable);
659        let new_version = manager.update(merged_actions.clone(), target_state).await?;
660
661        info!(
662            "Successfully submitted merged staged manifests for region {}, new version: {}",
663            self.region_id, new_version
664        );
665
666        // Apply the merged changes to in-memory version control
667        let merged_edit = merged_actions.into_region_edit();
668        self.version_control
669            .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
670
671        // Clear all staging manifests and transit state
672        manager.store().clear_staging_manifests().await?;
673        self.exit_staging()?;
674
675        Ok(())
676    }
677}
678
679/// Context to update the region manifest.
680#[derive(Debug)]
681pub(crate) struct ManifestContext {
682    /// Manager to maintain manifest for this region.
683    pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
684    /// The state of the region. The region checks the state before updating
685    /// manifest.
686    state: AtomicCell<RegionRoleState>,
687}
688
689impl ManifestContext {
690    pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
691        ManifestContext {
692            manifest_manager: tokio::sync::RwLock::new(manager),
693            state: AtomicCell::new(state),
694        }
695    }
696
697    pub(crate) async fn manifest_version(&self) -> ManifestVersion {
698        self.manifest_manager
699            .read()
700            .await
701            .manifest()
702            .manifest_version
703    }
704
705    pub(crate) async fn has_update(&self) -> Result<bool> {
706        self.manifest_manager.read().await.has_update().await
707    }
708
709    /// Returns the current region role state.
710    pub(crate) fn current_state(&self) -> RegionRoleState {
711        self.state.load()
712    }
713
714    /// Installs the manifest changes from the current version to the target version (inclusive).
715    ///
716    /// Returns installed [RegionManifest].
717    /// **Note**: This function is not guaranteed to install the target version strictly.
718    /// The installed version may be greater than the target version.
719    pub(crate) async fn install_manifest_to(
720        &self,
721        version: ManifestVersion,
722    ) -> Result<Arc<RegionManifest>> {
723        let mut manager = self.manifest_manager.write().await;
724        manager.install_manifest_to(version).await?;
725
726        Ok(manager.manifest())
727    }
728
729    /// Updates the manifest if current state is `expect_state`.
730    pub(crate) async fn update_manifest(
731        &self,
732        expect_state: RegionLeaderState,
733        action_list: RegionMetaActionList,
734    ) -> Result<ManifestVersion> {
735        // Acquires the write lock of the manifest manager.
736        let mut manager = self.manifest_manager.write().await;
737        // Gets current manifest.
738        let manifest = manager.manifest();
739        // Checks state inside the lock. This is to ensure that we won't update the manifest
740        // after `set_readonly_gracefully()` is called.
741        let current_state = self.state.load();
742
743        // If expect_state is not downgrading, the current state must be either `expect_state` or downgrading.
744        //
745        // A downgrading leader rejects user writes but still allows
746        // flushing the memtable and updating the manifest.
747        if expect_state != RegionLeaderState::Downgrading {
748            if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
749                info!(
750                    "Region {} is in downgrading leader state, updating manifest. state is {:?}",
751                    manifest.metadata.region_id, expect_state
752                );
753            }
754            ensure!(
755                current_state == RegionRoleState::Leader(expect_state)
756                    || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
757                UpdateManifestSnafu {
758                    region_id: manifest.metadata.region_id,
759                    state: current_state,
760                }
761            );
762        } else {
763            ensure!(
764                current_state == RegionRoleState::Leader(expect_state),
765                RegionStateSnafu {
766                    region_id: manifest.metadata.region_id,
767                    state: current_state,
768                    expect: RegionRoleState::Leader(expect_state),
769                }
770            );
771        }
772
773        for action in &action_list.actions {
774            // Checks whether the edit is still applicable.
775            let RegionMetaAction::Edit(edit) = &action else {
776                continue;
777            };
778
779            // Checks whether the region is truncated.
780            let Some(truncated_entry_id) = manifest.truncated_entry_id else {
781                continue;
782            };
783
784            // This is an edit from flush.
785            if let Some(flushed_entry_id) = edit.flushed_entry_id {
786                ensure!(
787                    truncated_entry_id < flushed_entry_id,
788                    RegionTruncatedSnafu {
789                        region_id: manifest.metadata.region_id,
790                    }
791                );
792            }
793
794            // This is an edit from compaction.
795            if !edit.files_to_remove.is_empty() {
796                // Input files of the compaction task has been truncated.
797                for file in &edit.files_to_remove {
798                    ensure!(
799                        manifest.files.contains_key(&file.file_id),
800                        RegionTruncatedSnafu {
801                            region_id: manifest.metadata.region_id,
802                        }
803                    );
804                }
805            }
806        }
807
808        // Now we can update the manifest.
809        let version = manager.update(action_list, current_state).await.inspect_err(
810            |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
811        )?;
812
813        if self.state.load() == RegionRoleState::Follower {
814            warn!(
815                "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
816                manifest.metadata.region_id
817            );
818        }
819
820        Ok(version)
821    }
822
823    /// Sets the [`RegionRole`].
824    ///
825    /// ```text
826    ///     +------------------------------------------+
827    ///     |                      +-----------------+ |
828    ///     |                      |                 | |
829    /// +---+------+       +-------+-----+        +--v-v---+
830    /// | Follower |       | Downgrading |        | Leader |
831    /// +---^-^----+       +-----+-^-----+        +--+-+---+
832    ///     | |                  | |                 | |
833    ///     | +------------------+ +-----------------+ |
834    ///     +------------------------------------------+
835    ///
836    /// Transition:
837    /// - Follower -> Leader
838    /// - Downgrading Leader -> Leader
839    /// - Leader -> Follower
840    /// - Downgrading Leader -> Follower
841    /// - Leader -> Downgrading Leader
842    ///
843    /// ```
844    pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
845        match next_role {
846            RegionRole::Follower => {
847                match self.state.fetch_update(|state| {
848                    if !matches!(state, RegionRoleState::Follower) {
849                        Some(RegionRoleState::Follower)
850                    } else {
851                        None
852                    }
853                }) {
854                    Ok(state) => info!(
855                        "Convert region {} to follower, previous role state: {:?}",
856                        region_id, state
857                    ),
858                    Err(state) => {
859                        if state != RegionRoleState::Follower {
860                            warn!(
861                                "Failed to convert region {} to follower, current role state: {:?}",
862                                region_id, state
863                            )
864                        }
865                    }
866                }
867            }
868            RegionRole::Leader => {
869                match self.state.fetch_update(|state| {
870                    if matches!(
871                        state,
872                        RegionRoleState::Follower
873                            | RegionRoleState::Leader(RegionLeaderState::Downgrading)
874                    ) {
875                        Some(RegionRoleState::Leader(RegionLeaderState::Writable))
876                    } else {
877                        None
878                    }
879                }) {
880                    Ok(state) => info!(
881                        "Convert region {} to leader, previous role state: {:?}",
882                        region_id, state
883                    ),
884                    Err(state) => {
885                        if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
886                            warn!(
887                                "Failed to convert region {} to leader, current role state: {:?}",
888                                region_id, state
889                            )
890                        }
891                    }
892                }
893            }
894            RegionRole::DowngradingLeader => {
895                match self.state.compare_exchange(
896                    RegionRoleState::Leader(RegionLeaderState::Writable),
897                    RegionRoleState::Leader(RegionLeaderState::Downgrading),
898                ) {
899                    Ok(state) => info!(
900                        "Convert region {} to downgrading region, previous role state: {:?}",
901                        region_id, state
902                    ),
903                    Err(state) => {
904                        if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
905                            warn!(
906                                "Failed to convert region {} to downgrading leader, current role state: {:?}",
907                                region_id, state
908                            )
909                        }
910                    }
911                }
912            }
913        }
914    }
915
916    pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
917        self.manifest_manager.read().await.manifest()
918    }
919}
920
921pub(crate) type ManifestContextRef = Arc<ManifestContext>;
922
923/// Regions indexed by ids.
924#[derive(Debug, Default)]
925pub(crate) struct RegionMap {
926    regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
927}
928
929impl RegionMap {
930    /// Returns true if the region exists.
931    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
932        let regions = self.regions.read().unwrap();
933        regions.contains_key(&region_id)
934    }
935
936    /// Inserts a new region into the map.
937    pub(crate) fn insert_region(&self, region: MitoRegionRef) {
938        let mut regions = self.regions.write().unwrap();
939        regions.insert(region.region_id, region);
940    }
941
942    /// Gets region by region id.
943    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
944        let regions = self.regions.read().unwrap();
945        regions.get(&region_id).cloned()
946    }
947
948    /// Gets writable region by region id.
949    ///
950    /// Returns error if the region does not exist or is readonly.
951    pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
952        let region = self
953            .get_region(region_id)
954            .context(RegionNotFoundSnafu { region_id })?;
955        ensure!(
956            region.is_writable(),
957            RegionStateSnafu {
958                region_id,
959                state: region.state(),
960                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
961            }
962        );
963        Ok(region)
964    }
965
966    /// Gets readonly region by region id.
967    ///
968    /// Returns error if the region does not exist or is writable.
969    pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
970        let region = self
971            .get_region(region_id)
972            .context(RegionNotFoundSnafu { region_id })?;
973        ensure!(
974            region.is_follower(),
975            RegionStateSnafu {
976                region_id,
977                state: region.state(),
978                expect: RegionRoleState::Follower,
979            }
980        );
981
982        Ok(region)
983    }
984
985    /// Gets region by region id.
986    ///
987    /// Calls the callback if the region does not exist.
988    pub(crate) fn get_region_or<F: OnFailure>(
989        &self,
990        region_id: RegionId,
991        cb: &mut F,
992    ) -> Option<MitoRegionRef> {
993        match self
994            .get_region(region_id)
995            .context(RegionNotFoundSnafu { region_id })
996        {
997            Ok(region) => Some(region),
998            Err(e) => {
999                cb.on_failure(e);
1000                None
1001            }
1002        }
1003    }
1004
1005    /// Gets writable region by region id.
1006    ///
1007    /// Calls the callback if the region does not exist or is readonly.
1008    pub(crate) fn writable_region_or<F: OnFailure>(
1009        &self,
1010        region_id: RegionId,
1011        cb: &mut F,
1012    ) -> Option<MitoRegionRef> {
1013        match self.writable_region(region_id) {
1014            Ok(region) => Some(region),
1015            Err(e) => {
1016                cb.on_failure(e);
1017                None
1018            }
1019        }
1020    }
1021
1022    /// Gets writable non-staging region by region id.
1023    ///
1024    /// Returns error if the region does not exist, is readonly, or is in staging mode.
1025    pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1026        let region = self.writable_region(region_id)?;
1027        if region.is_staging() {
1028            return Err(crate::error::RegionStateSnafu {
1029                region_id,
1030                state: region.state(),
1031                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1032            }
1033            .build());
1034        }
1035        Ok(region)
1036    }
1037
1038    /// Gets flushable region by region id.
1039    ///
1040    /// Returns error if the region does not exist or is not operable.
1041    fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1042        let region = self
1043            .get_region(region_id)
1044            .context(RegionNotFoundSnafu { region_id })?;
1045        ensure!(
1046            region.is_flushable(),
1047            FlushableRegionStateSnafu {
1048                region_id,
1049                state: region.state(),
1050            }
1051        );
1052        Ok(region)
1053    }
1054
1055    /// Gets flushable region by region id.
1056    ///
1057    /// Calls the callback if the region does not exist or is not operable.
1058    pub(crate) fn flushable_region_or<F: OnFailure>(
1059        &self,
1060        region_id: RegionId,
1061        cb: &mut F,
1062    ) -> Option<MitoRegionRef> {
1063        match self.flushable_region(region_id) {
1064            Ok(region) => Some(region),
1065            Err(e) => {
1066                cb.on_failure(e);
1067                None
1068            }
1069        }
1070    }
1071
1072    /// Remove region by id.
1073    pub(crate) fn remove_region(&self, region_id: RegionId) {
1074        let mut regions = self.regions.write().unwrap();
1075        regions.remove(&region_id);
1076    }
1077
1078    /// List all regions.
1079    pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1080        let regions = self.regions.read().unwrap();
1081        regions.values().cloned().collect()
1082    }
1083
1084    /// Clear the map.
1085    pub(crate) fn clear(&self) {
1086        self.regions.write().unwrap().clear();
1087    }
1088}
1089
1090pub(crate) type RegionMapRef = Arc<RegionMap>;
1091
1092/// Opening regions
1093#[derive(Debug, Default)]
1094pub(crate) struct OpeningRegions {
1095    regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1096}
1097
1098impl OpeningRegions {
1099    /// Registers `sender` for an opening region; Otherwise, it returns `None`.
1100    pub(crate) fn wait_for_opening_region(
1101        &self,
1102        region_id: RegionId,
1103        sender: OptionOutputTx,
1104    ) -> Option<OptionOutputTx> {
1105        let mut regions = self.regions.write().unwrap();
1106        match regions.entry(region_id) {
1107            Entry::Occupied(mut senders) => {
1108                senders.get_mut().push(sender);
1109                None
1110            }
1111            Entry::Vacant(_) => Some(sender),
1112        }
1113    }
1114
1115    /// Returns true if the region exists.
1116    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1117        let regions = self.regions.read().unwrap();
1118        regions.contains_key(&region_id)
1119    }
1120
1121    /// Inserts a new region into the map.
1122    pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1123        let mut regions = self.regions.write().unwrap();
1124        regions.insert(region, vec![sender]);
1125    }
1126
1127    /// Remove region by id.
1128    pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1129        let mut regions = self.regions.write().unwrap();
1130        regions.remove(&region_id).unwrap_or_default()
1131    }
1132
1133    #[cfg(test)]
1134    pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1135        let regions = self.regions.read().unwrap();
1136        if let Some(senders) = regions.get(&region_id) {
1137            senders.len()
1138        } else {
1139            0
1140        }
1141    }
1142}
1143
1144pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1145
1146/// The regions that are catching up.
1147#[derive(Debug, Default)]
1148pub(crate) struct CatchupRegions {
1149    regions: RwLock<HashSet<RegionId>>,
1150}
1151
1152impl CatchupRegions {
1153    /// Returns true if the region exists.
1154    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1155        let regions = self.regions.read().unwrap();
1156        regions.contains(&region_id)
1157    }
1158
1159    /// Inserts a new region into the set.
1160    pub(crate) fn insert_region(&self, region_id: RegionId) {
1161        let mut regions = self.regions.write().unwrap();
1162        regions.insert(region_id);
1163    }
1164
1165    /// Remove region by id.
1166    pub(crate) fn remove_region(&self, region_id: RegionId) {
1167        let mut regions = self.regions.write().unwrap();
1168        regions.remove(&region_id);
1169    }
1170}
1171
1172pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1173
1174/// Manifest stats.
1175#[derive(Default, Debug, Clone)]
1176pub struct ManifestStats {
1177    pub(crate) total_manifest_size: Arc<AtomicU64>,
1178    pub(crate) manifest_version: Arc<AtomicU64>,
1179    pub(crate) file_removed_cnt: Arc<AtomicU64>,
1180}
1181
1182impl ManifestStats {
1183    fn total_manifest_size(&self) -> u64 {
1184        self.total_manifest_size.load(Ordering::Relaxed)
1185    }
1186
1187    fn manifest_version(&self) -> u64 {
1188        self.manifest_version.load(Ordering::Relaxed)
1189    }
1190
1191    fn file_removed_cnt(&self) -> u64 {
1192        self.file_removed_cnt.load(Ordering::Relaxed)
1193    }
1194}
1195
1196#[cfg(test)]
1197mod tests {
1198    use std::sync::Arc;
1199    use std::sync::atomic::AtomicU64;
1200
1201    use common_datasource::compression::CompressionType;
1202    use common_test_util::temp_dir::create_temp_dir;
1203    use crossbeam_utils::atomic::AtomicCell;
1204    use object_store::ObjectStore;
1205    use object_store::services::Fs;
1206    use store_api::logstore::provider::Provider;
1207    use store_api::region_engine::RegionRole;
1208    use store_api::region_request::PathType;
1209    use store_api::storage::RegionId;
1210
1211    use crate::access_layer::AccessLayer;
1212    use crate::manifest::action::RegionMetaActionList;
1213    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1214    use crate::region::{
1215        ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1216    };
1217    use crate::sst::FormatType;
1218    use crate::sst::index::intermediate::IntermediateManager;
1219    use crate::sst::index::puffin_manager::PuffinManagerFactory;
1220    use crate::test_util::scheduler_util::SchedulerEnv;
1221    use crate::test_util::version_util::VersionControlBuilder;
1222    use crate::time_provider::StdTimeProvider;
1223
1224    #[test]
1225    fn test_region_state_lock_free() {
1226        assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1227    }
1228
1229    #[tokio::test]
1230    async fn test_set_region_state() {
1231        let env = SchedulerEnv::new().await;
1232        let builder = VersionControlBuilder::new();
1233        let version_control = Arc::new(builder.build());
1234        let manifest_ctx = env
1235            .mock_manifest_context(version_control.current().version.metadata.clone())
1236            .await;
1237
1238        let region_id = RegionId::new(1024, 0);
1239        // Leader -> Follower
1240        manifest_ctx.set_role(RegionRole::Follower, region_id);
1241        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1242
1243        // Follower -> Leader
1244        manifest_ctx.set_role(RegionRole::Leader, region_id);
1245        assert_eq!(
1246            manifest_ctx.state.load(),
1247            RegionRoleState::Leader(RegionLeaderState::Writable)
1248        );
1249
1250        // Leader -> Downgrading Leader
1251        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1252        assert_eq!(
1253            manifest_ctx.state.load(),
1254            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1255        );
1256
1257        // Downgrading Leader -> Follower
1258        manifest_ctx.set_role(RegionRole::Follower, region_id);
1259        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1260
1261        // Can't downgrade from follower (Follower -> Downgrading Leader)
1262        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1263        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1264
1265        // Set region role too Downgrading Leader
1266        manifest_ctx.set_role(RegionRole::Leader, region_id);
1267        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1268        assert_eq!(
1269            manifest_ctx.state.load(),
1270            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1271        );
1272
1273        // Downgrading Leader -> Leader
1274        manifest_ctx.set_role(RegionRole::Leader, region_id);
1275        assert_eq!(
1276            manifest_ctx.state.load(),
1277            RegionRoleState::Leader(RegionLeaderState::Writable)
1278        );
1279    }
1280
1281    #[tokio::test]
1282    async fn test_staging_state_validation() {
1283        let env = SchedulerEnv::new().await;
1284        let builder = VersionControlBuilder::new();
1285        let version_control = Arc::new(builder.build());
1286
1287        // Create context with staging state using the correct pattern from SchedulerEnv
1288        let staging_ctx = {
1289            let manager = RegionManifestManager::new(
1290                version_control.current().version.metadata.clone(),
1291                0,
1292                RegionManifestOptions {
1293                    manifest_dir: "".to_string(),
1294                    object_store: env.access_layer.object_store().clone(),
1295                    compress_type: CompressionType::Uncompressed,
1296                    checkpoint_distance: 10,
1297                    remove_file_options: Default::default(),
1298                },
1299                FormatType::PrimaryKey,
1300                &Default::default(),
1301            )
1302            .await
1303            .unwrap();
1304            Arc::new(ManifestContext::new(
1305                manager,
1306                RegionRoleState::Leader(RegionLeaderState::Staging),
1307            ))
1308        };
1309
1310        // Test staging state behavior
1311        assert_eq!(
1312            staging_ctx.current_state(),
1313            RegionRoleState::Leader(RegionLeaderState::Staging)
1314        );
1315
1316        // Test writable context for comparison
1317        let writable_ctx = env
1318            .mock_manifest_context(version_control.current().version.metadata.clone())
1319            .await;
1320
1321        assert_eq!(
1322            writable_ctx.current_state(),
1323            RegionRoleState::Leader(RegionLeaderState::Writable)
1324        );
1325    }
1326
1327    #[tokio::test]
1328    async fn test_staging_state_transitions() {
1329        let builder = VersionControlBuilder::new();
1330        let version_control = Arc::new(builder.build());
1331        let metadata = version_control.current().version.metadata.clone();
1332
1333        // Create MitoRegion for testing state transitions
1334        let temp_dir = create_temp_dir("");
1335        let path_str = temp_dir.path().display().to_string();
1336        let fs_builder = Fs::default().root(&path_str);
1337        let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1338
1339        let index_aux_path = temp_dir.path().join("index_aux");
1340        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1341            .await
1342            .unwrap();
1343        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1344            .await
1345            .unwrap();
1346
1347        let access_layer = Arc::new(AccessLayer::new(
1348            "",
1349            PathType::Bare,
1350            object_store,
1351            puffin_mgr,
1352            intm_mgr,
1353        ));
1354
1355        let manager = RegionManifestManager::new(
1356            metadata.clone(),
1357            0,
1358            RegionManifestOptions {
1359                manifest_dir: "".to_string(),
1360                object_store: access_layer.object_store().clone(),
1361                compress_type: CompressionType::Uncompressed,
1362                checkpoint_distance: 10,
1363                remove_file_options: Default::default(),
1364            },
1365            FormatType::PrimaryKey,
1366            &Default::default(),
1367        )
1368        .await
1369        .unwrap();
1370
1371        let manifest_ctx = Arc::new(ManifestContext::new(
1372            manager,
1373            RegionRoleState::Leader(RegionLeaderState::Writable),
1374        ));
1375
1376        let region = MitoRegion {
1377            region_id: metadata.region_id,
1378            version_control,
1379            access_layer,
1380            manifest_ctx: manifest_ctx.clone(),
1381            file_purger: crate::test_util::new_noop_file_purger(),
1382            provider: Provider::noop_provider(),
1383            last_flush_millis: Default::default(),
1384            last_compaction_millis: Default::default(),
1385            time_provider: Arc::new(StdTimeProvider),
1386            topic_latest_entry_id: Default::default(),
1387            written_bytes: Arc::new(AtomicU64::new(0)),
1388            stats: ManifestStats::default(),
1389        };
1390
1391        // Test initial state
1392        assert_eq!(
1393            region.state(),
1394            RegionRoleState::Leader(RegionLeaderState::Writable)
1395        );
1396        assert!(!region.is_staging());
1397
1398        // Test transition to staging
1399        let mut manager = manifest_ctx.manifest_manager.write().await;
1400        region.set_staging(&mut manager).await.unwrap();
1401        drop(manager);
1402        assert_eq!(
1403            region.state(),
1404            RegionRoleState::Leader(RegionLeaderState::Staging)
1405        );
1406        assert!(region.is_staging());
1407
1408        // Test transition back to writable
1409        region.exit_staging().unwrap();
1410        assert_eq!(
1411            region.state(),
1412            RegionRoleState::Leader(RegionLeaderState::Writable)
1413        );
1414        assert!(!region.is_staging());
1415
1416        // Test staging directory cleanup: Create dirty staging files before entering staging mode
1417        {
1418            // Create some dummy staging manifest files to simulate interrupted session
1419            let manager = manifest_ctx.manifest_manager.write().await;
1420            let dummy_actions = RegionMetaActionList::new(vec![]);
1421            let dummy_bytes = dummy_actions.encode().unwrap();
1422
1423            // Create dirty staging files with versions 100 and 101
1424            manager.store().save(100, &dummy_bytes, true).await.unwrap();
1425            manager.store().save(101, &dummy_bytes, true).await.unwrap();
1426            drop(manager);
1427
1428            // Verify dirty files exist before entering staging
1429            let manager = manifest_ctx.manifest_manager.read().await;
1430            let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1431            assert_eq!(
1432                dirty_manifests.len(),
1433                2,
1434                "Should have 2 dirty staging files"
1435            );
1436            drop(manager);
1437
1438            // Enter staging mode - this should clean up the dirty files
1439            let mut manager = manifest_ctx.manifest_manager.write().await;
1440            region.set_staging(&mut manager).await.unwrap();
1441            drop(manager);
1442
1443            // Verify dirty files are cleaned up after entering staging
1444            let manager = manifest_ctx.manifest_manager.read().await;
1445            let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1446            assert_eq!(
1447                cleaned_manifests.len(),
1448                0,
1449                "Dirty staging files should be cleaned up"
1450            );
1451            drop(manager);
1452
1453            // Exit staging to restore normal state for remaining tests
1454            region.exit_staging().unwrap();
1455        }
1456
1457        // Test invalid transitions
1458        let mut manager = manifest_ctx.manifest_manager.write().await;
1459        assert!(region.set_staging(&mut manager).await.is_ok()); // Writable -> Staging should work
1460        drop(manager);
1461        let mut manager = manifest_ctx.manifest_manager.write().await;
1462        assert!(region.set_staging(&mut manager).await.is_err()); // Staging -> Staging should fail
1463        drop(manager);
1464        assert!(region.exit_staging().is_ok()); // Staging -> Writable should work
1465        assert!(region.exit_staging().is_err()); // Writable -> Writable should fail
1466    }
1467}