mito2/
region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region.
16
17pub mod opener;
18pub mod options;
19pub(crate) mod version;
20
21use std::collections::hash_map::Entry;
22use std::collections::{HashMap, HashSet};
23use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
24use std::sync::{Arc, RwLock};
25
26use common_telemetry::{error, info, warn};
27use crossbeam_utils::atomic::AtomicCell;
28use snafu::{OptionExt, ensure};
29use store_api::ManifestVersion;
30use store_api::codec::PrimaryKeyEncoding;
31use store_api::logstore::provider::Provider;
32use store_api::metadata::RegionMetadataRef;
33use store_api::region_engine::{
34    RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
35};
36use store_api::sst_entry::ManifestSstEntry;
37use store_api::storage::{RegionId, SequenceNumber};
38use tokio::sync::RwLockWriteGuard;
39
40use crate::access_layer::AccessLayerRef;
41use crate::error::{
42    FlushableRegionStateSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
43    UpdateManifestSnafu,
44};
45use crate::manifest::action::{
46    RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
47};
48use crate::manifest::manager::RegionManifestManager;
49use crate::memtable::MemtableBuilderRef;
50use crate::region::version::{VersionControlRef, VersionRef};
51use crate::request::{OnFailure, OptionOutputTx};
52use crate::sst::FormatType;
53use crate::sst::file_purger::FilePurgerRef;
54use crate::sst::location::{index_file_path, sst_file_path};
55use crate::time_provider::TimeProviderRef;
56
57/// This is the approximate factor to estimate the size of wal.
58const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
59
60/// Region status include region id, memtable usage, sst usage, wal usage and manifest usage.
61#[derive(Debug)]
62pub struct RegionUsage {
63    pub region_id: RegionId,
64    pub wal_usage: u64,
65    pub sst_usage: u64,
66    pub manifest_usage: u64,
67}
68
69impl RegionUsage {
70    pub fn disk_usage(&self) -> u64 {
71        self.wal_usage + self.sst_usage + self.manifest_usage
72    }
73}
74
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum RegionLeaderState {
77    /// The region is opened and is writable.
78    Writable,
79    /// The region is in staging mode - writable but no checkpoint/compaction.
80    Staging,
81    /// The region is altering.
82    Altering,
83    /// The region is dropping.
84    Dropping,
85    /// The region is truncating.
86    Truncating,
87    /// The region is handling a region edit.
88    Editing,
89    /// The region is stepping down.
90    Downgrading,
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub enum RegionRoleState {
95    Leader(RegionLeaderState),
96    Follower,
97}
98
99/// Metadata and runtime status of a region.
100///
101/// Writing and reading a region follow a single-writer-multi-reader rule:
102/// - Only the region worker thread this region belongs to can modify the metadata.
103/// - Multiple reader threads are allowed to read a specific `version` of a region.
104#[derive(Debug)]
105pub struct MitoRegion {
106    /// Id of this region.
107    ///
108    /// Accessing region id from the version control is inconvenient so
109    /// we also store it here.
110    pub(crate) region_id: RegionId,
111
112    /// Version controller for this region.
113    ///
114    /// We MUST update the version control inside the write lock of the region manifest manager.
115    pub(crate) version_control: VersionControlRef,
116    /// SSTs accessor for this region.
117    pub(crate) access_layer: AccessLayerRef,
118    /// Context to maintain manifest for this region.
119    pub(crate) manifest_ctx: ManifestContextRef,
120    /// SST file purger.
121    pub(crate) file_purger: FilePurgerRef,
122    /// The provider of log store.
123    pub(crate) provider: Provider,
124    /// Last flush time in millis.
125    last_flush_millis: AtomicI64,
126    /// Last compaction time in millis.
127    last_compaction_millis: AtomicI64,
128    /// Provider to get current time.
129    time_provider: TimeProviderRef,
130    /// The topic's latest entry id since the region's last flushing.
131    /// **Only used for remote WAL pruning.**
132    ///
133    /// The value will be updated to the latest offset of the topic
134    /// if region receives a flush request or schedules a periodic flush task
135    /// and the region's memtable is empty.
136    ///
137    /// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region,
138    /// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`.
139    pub(crate) topic_latest_entry_id: AtomicU64,
140    /// The total bytes written to the region.
141    pub(crate) written_bytes: Arc<AtomicU64>,
142    /// Memtable builder for the region.
143    pub(crate) memtable_builder: MemtableBuilderRef,
144    /// Format type of the SST file.
145    pub(crate) sst_format: FormatType,
146    /// manifest stats
147    stats: ManifestStats,
148}
149
150pub type MitoRegionRef = Arc<MitoRegion>;
151
152impl MitoRegion {
153    /// Stop background managers for this region.
154    pub(crate) async fn stop(&self) {
155        self.manifest_ctx
156            .manifest_manager
157            .write()
158            .await
159            .stop()
160            .await;
161
162        info!(
163            "Stopped region manifest manager, region_id: {}",
164            self.region_id
165        );
166    }
167
168    /// Returns current metadata of the region.
169    pub(crate) fn metadata(&self) -> RegionMetadataRef {
170        let version_data = self.version_control.current();
171        version_data.version.metadata.clone()
172    }
173
174    /// Returns primary key encoding of the region.
175    pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
176        let version_data = self.version_control.current();
177        version_data.version.metadata.primary_key_encoding
178    }
179
180    /// Returns current version of the region.
181    pub(crate) fn version(&self) -> VersionRef {
182        let version_data = self.version_control.current();
183        version_data.version
184    }
185
186    /// Returns last flush timestamp in millis.
187    pub(crate) fn last_flush_millis(&self) -> i64 {
188        self.last_flush_millis.load(Ordering::Relaxed)
189    }
190
191    /// Update flush time to current time.
192    pub(crate) fn update_flush_millis(&self) {
193        let now = self.time_provider.current_time_millis();
194        self.last_flush_millis.store(now, Ordering::Relaxed);
195    }
196
197    /// Returns last compaction timestamp in millis.
198    pub(crate) fn last_compaction_millis(&self) -> i64 {
199        self.last_compaction_millis.load(Ordering::Relaxed)
200    }
201
202    /// Returns format type of the SST file.
203    pub(crate) fn sst_format(&self) -> FormatType {
204        self.sst_format
205    }
206
207    /// Update compaction time to current time.
208    pub(crate) fn update_compaction_millis(&self) {
209        let now = self.time_provider.current_time_millis();
210        self.last_compaction_millis.store(now, Ordering::Relaxed);
211    }
212
213    /// Returns the table dir.
214    pub(crate) fn table_dir(&self) -> &str {
215        self.access_layer.table_dir()
216    }
217
218    /// Returns whether the region is writable.
219    pub(crate) fn is_writable(&self) -> bool {
220        matches!(
221            self.manifest_ctx.state.load(),
222            RegionRoleState::Leader(RegionLeaderState::Writable)
223                | RegionRoleState::Leader(RegionLeaderState::Staging)
224        )
225    }
226
227    /// Returns whether the region is flushable.
228    pub(crate) fn is_flushable(&self) -> bool {
229        matches!(
230            self.manifest_ctx.state.load(),
231            RegionRoleState::Leader(RegionLeaderState::Writable)
232                | RegionRoleState::Leader(RegionLeaderState::Staging)
233                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
234        )
235    }
236
237    /// Returns whether the region is downgrading.
238    pub(crate) fn is_downgrading(&self) -> bool {
239        matches!(
240            self.manifest_ctx.state.load(),
241            RegionRoleState::Leader(RegionLeaderState::Downgrading)
242        )
243    }
244
245    /// Returns whether the region is in staging mode.
246    #[allow(dead_code)]
247    pub(crate) fn is_staging(&self) -> bool {
248        self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
249    }
250
251    pub fn region_id(&self) -> RegionId {
252        self.region_id
253    }
254
255    pub fn find_committed_sequence(&self) -> SequenceNumber {
256        self.version_control.committed_sequence()
257    }
258
259    /// Returns whether the region is readonly.
260    pub fn is_follower(&self) -> bool {
261        self.manifest_ctx.state.load() == RegionRoleState::Follower
262    }
263
264    /// Returns the state of the region.
265    pub(crate) fn state(&self) -> RegionRoleState {
266        self.manifest_ctx.state.load()
267    }
268
269    /// Sets the region role state.
270    pub(crate) fn set_role(&self, next_role: RegionRole) {
271        self.manifest_ctx.set_role(next_role, self.region_id);
272    }
273
274    /// Sets the altering state.
275    /// You should call this method in the worker loop.
276    pub(crate) fn set_altering(&self) -> Result<()> {
277        self.compare_exchange_state(
278            RegionLeaderState::Writable,
279            RegionRoleState::Leader(RegionLeaderState::Altering),
280        )
281    }
282
283    /// Sets the dropping state.
284    /// You should call this method in the worker loop.
285    pub(crate) fn set_dropping(&self) -> Result<()> {
286        self.compare_exchange_state(
287            RegionLeaderState::Writable,
288            RegionRoleState::Leader(RegionLeaderState::Dropping),
289        )
290    }
291
292    /// Sets the truncating state.
293    /// You should call this method in the worker loop.
294    pub(crate) fn set_truncating(&self) -> Result<()> {
295        self.compare_exchange_state(
296            RegionLeaderState::Writable,
297            RegionRoleState::Leader(RegionLeaderState::Truncating),
298        )
299    }
300
301    /// Sets the editing state.
302    /// You should call this method in the worker loop.
303    pub(crate) fn set_editing(&self) -> Result<()> {
304        self.compare_exchange_state(
305            RegionLeaderState::Writable,
306            RegionRoleState::Leader(RegionLeaderState::Editing),
307        )
308    }
309
310    /// Sets the staging state.
311    ///
312    /// You should call this method in the worker loop.
313    /// Transitions from Writable to Staging state.
314    /// Cleans any existing staging manifests before entering staging mode.
315    pub(crate) async fn set_staging(
316        &self,
317        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
318    ) -> Result<()> {
319        manager.store().clear_staging_manifests().await?;
320
321        self.compare_exchange_state(
322            RegionLeaderState::Writable,
323            RegionRoleState::Leader(RegionLeaderState::Staging),
324        )
325    }
326
327    /// Exits the staging state back to writable.
328    ///
329    /// You should call this method in the worker loop.
330    /// Transitions from Staging to Writable state.
331    fn exit_staging(&self) -> Result<()> {
332        self.compare_exchange_state(
333            RegionLeaderState::Staging,
334            RegionRoleState::Leader(RegionLeaderState::Writable),
335        )
336    }
337
338    /// Sets the region role state gracefully. This acquires the manifest write lock.
339    pub(crate) async fn set_role_state_gracefully(
340        &self,
341        state: SettableRegionRoleState,
342    ) -> Result<()> {
343        let mut manager = self.manifest_ctx.manifest_manager.write().await;
344        let current_state = self.state();
345
346        match state {
347            SettableRegionRoleState::Leader => {
348                // Exit staging mode and return to normal writable leader
349                // Only allowed from staging state
350                match current_state {
351                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
352                        info!("Exiting staging mode for region {}", self.region_id);
353                        // Use the success exit path that merges all staged manifests
354                        self.exit_staging_on_success(&mut manager).await?;
355                    }
356                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
357                        // Already in desired state - no-op
358                        info!("Region {} already in normal leader mode", self.region_id);
359                    }
360                    _ => {
361                        // Only staging -> leader transition is allowed
362                        return Err(RegionStateSnafu {
363                            region_id: self.region_id,
364                            state: current_state,
365                            expect: RegionRoleState::Leader(RegionLeaderState::Staging),
366                        }
367                        .build());
368                    }
369                }
370            }
371
372            SettableRegionRoleState::StagingLeader => {
373                // Enter staging mode from normal writable leader
374                // Only allowed from writable leader state
375                match current_state {
376                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
377                        info!("Entering staging mode for region {}", self.region_id);
378                        self.set_staging(&mut manager).await?;
379                    }
380                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
381                        // Already in desired state - no-op
382                        info!("Region {} already in staging mode", self.region_id);
383                    }
384                    _ => {
385                        return Err(RegionStateSnafu {
386                            region_id: self.region_id,
387                            state: current_state,
388                            expect: RegionRoleState::Leader(RegionLeaderState::Writable),
389                        }
390                        .build());
391                    }
392                }
393            }
394
395            SettableRegionRoleState::Follower => {
396                // Make this region a follower
397                match current_state {
398                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
399                        info!(
400                            "Exiting staging and demoting region {} to follower",
401                            self.region_id
402                        );
403                        self.exit_staging()?;
404                        self.set_role(RegionRole::Follower);
405                    }
406                    RegionRoleState::Leader(_) => {
407                        info!("Demoting region {} from leader to follower", self.region_id);
408                        self.set_role(RegionRole::Follower);
409                    }
410                    RegionRoleState::Follower => {
411                        // Already in desired state - no-op
412                        info!("Region {} already in follower mode", self.region_id);
413                    }
414                }
415            }
416
417            SettableRegionRoleState::DowngradingLeader => {
418                // downgrade this region to downgrading leader
419                match current_state {
420                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
421                        info!(
422                            "Exiting staging and entering downgrade for region {}",
423                            self.region_id
424                        );
425                        self.exit_staging()?;
426                        self.set_role(RegionRole::DowngradingLeader);
427                    }
428                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
429                        info!("Starting downgrade for region {}", self.region_id);
430                        self.set_role(RegionRole::DowngradingLeader);
431                    }
432                    RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
433                        // Already in desired state - no-op
434                        info!("Region {} already in downgrading mode", self.region_id);
435                    }
436                    _ => {
437                        warn!(
438                            "Cannot start downgrade for region {} from state {:?}",
439                            self.region_id, current_state
440                        );
441                    }
442                }
443            }
444        }
445
446        // Hack(zhongzc): If we have just become leader (writable), persist any backfilled metadata.
447        if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
448            // Persist backfilled metadata if manifest is missing fields (e.g., partition_expr)
449            let manifest_meta = &manager.manifest().metadata;
450            let current_meta = &self.version().metadata;
451            if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
452                let action = RegionMetaAction::Change(RegionChange {
453                    metadata: current_meta.clone(),
454                    sst_format: self.sst_format(),
455                });
456                let result = manager
457                    .update(
458                        RegionMetaActionList::with_action(action),
459                        RegionRoleState::Leader(RegionLeaderState::Writable),
460                    )
461                    .await;
462
463                match result {
464                    Ok(version) => {
465                        info!(
466                            "Successfully persisted backfilled metadata for region {}, version: {}",
467                            self.region_id, version
468                        );
469                    }
470                    Err(e) => {
471                        warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
472                    }
473                }
474            }
475        }
476
477        drop(manager);
478
479        Ok(())
480    }
481
482    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Writable)` if the current state is `expect`.
483    /// Otherwise, logs an error.
484    pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
485        if let Err(e) = self
486            .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
487        {
488            error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
489        }
490    }
491
492    /// Returns the region statistic.
493    pub(crate) fn region_statistic(&self) -> RegionStatistic {
494        let version = self.version();
495        let memtables = &version.memtables;
496        let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
497
498        let sst_usage = version.ssts.sst_usage();
499        let index_usage = version.ssts.index_usage();
500        let flushed_entry_id = version.flushed_entry_id;
501
502        let wal_usage = self.estimated_wal_usage(memtable_usage);
503        let manifest_usage = self.stats.total_manifest_size();
504        let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
505        let num_files = version.ssts.num_files();
506        let manifest_version = self.stats.manifest_version();
507
508        let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
509        let written_bytes = self.written_bytes.load(Ordering::Relaxed);
510
511        RegionStatistic {
512            num_rows,
513            memtable_size: memtable_usage,
514            wal_size: wal_usage,
515            manifest_size: manifest_usage,
516            sst_size: sst_usage,
517            sst_num: num_files,
518            index_size: index_usage,
519            manifest: RegionManifestInfo::Mito {
520                manifest_version,
521                flushed_entry_id,
522            },
523            data_topic_latest_entry_id: topic_latest_entry_id,
524            metadata_topic_latest_entry_id: topic_latest_entry_id,
525            written_bytes,
526        }
527    }
528
529    /// Estimated WAL size in bytes.
530    /// Use the memtables size to estimate the size of wal.
531    fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
532        ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
533    }
534
535    /// Sets the state of the region to given state if the current state equals to
536    /// the expected.
537    fn compare_exchange_state(
538        &self,
539        expect: RegionLeaderState,
540        state: RegionRoleState,
541    ) -> Result<()> {
542        self.manifest_ctx
543            .state
544            .compare_exchange(RegionRoleState::Leader(expect), state)
545            .map_err(|actual| {
546                RegionStateSnafu {
547                    region_id: self.region_id,
548                    state: actual,
549                    expect: RegionRoleState::Leader(expect),
550                }
551                .build()
552            })?;
553        Ok(())
554    }
555
556    /// Returns the SST entries of the region.
557    pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
558        let table_dir = self.table_dir();
559        let path_type = self.access_layer.path_type();
560
561        let visible_ssts = self
562            .version()
563            .ssts
564            .levels()
565            .iter()
566            .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
567            .collect::<HashSet<_>>();
568
569        self.manifest_ctx
570            .manifest()
571            .await
572            .files
573            .values()
574            .map(|meta| {
575                let region_id = self.region_id;
576                let origin_region_id = meta.region_id;
577                let (index_file_path, index_file_size) = if meta.index_file_size > 0 {
578                    let index_file_path = index_file_path(table_dir, meta.file_id(), path_type);
579                    (Some(index_file_path), Some(meta.index_file_size))
580                } else {
581                    (None, None)
582                };
583                let visible = visible_ssts.contains(&meta.file_id);
584                ManifestSstEntry {
585                    table_dir: table_dir.to_string(),
586                    region_id,
587                    table_id: region_id.table_id(),
588                    region_number: region_id.region_number(),
589                    region_group: region_id.region_group(),
590                    region_sequence: region_id.region_sequence(),
591                    file_id: meta.file_id.to_string(),
592                    level: meta.level,
593                    file_path: sst_file_path(table_dir, meta.file_id(), path_type),
594                    file_size: meta.file_size,
595                    index_file_path,
596                    index_file_size,
597                    num_rows: meta.num_rows,
598                    num_row_groups: meta.num_row_groups,
599                    min_ts: meta.time_range.0,
600                    max_ts: meta.time_range.1,
601                    sequence: meta.sequence.map(|s| s.get()),
602                    origin_region_id,
603                    node_id: None,
604                    visible,
605                }
606            })
607            .collect()
608    }
609
610    /// Exit staging mode successfully by merging all staged manifests and making them visible.
611    pub(crate) async fn exit_staging_on_success(
612        &self,
613        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
614    ) -> Result<()> {
615        let current_state = self.manifest_ctx.current_state();
616        ensure!(
617            current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
618            RegionStateSnafu {
619                region_id: self.region_id,
620                state: current_state,
621                expect: RegionRoleState::Leader(RegionLeaderState::Staging),
622            }
623        );
624
625        // Merge all staged manifest actions
626        let merged_actions = match manager.merge_staged_actions(current_state).await? {
627            Some(actions) => actions,
628            None => {
629                info!(
630                    "No staged manifests to merge for region {}, exiting staging mode without changes",
631                    self.region_id
632                );
633                // Even if no manifests to merge, we still need to exit staging mode
634                self.exit_staging()?;
635                return Ok(());
636            }
637        };
638
639        // Submit merged actions using the manifest manager's update method
640        // Pass the target state (Writable) so it saves to normal directory, not staging
641        let target_state = RegionRoleState::Leader(RegionLeaderState::Writable);
642        let new_version = manager.update(merged_actions.clone(), target_state).await?;
643
644        info!(
645            "Successfully submitted merged staged manifests for region {}, new version: {}",
646            self.region_id, new_version
647        );
648
649        // Apply the merged changes to in-memory version control
650        let merged_edit = merged_actions.into_region_edit();
651        self.version_control
652            .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
653
654        // Clear all staging manifests and transit state
655        manager.store().clear_staging_manifests().await?;
656        self.exit_staging()?;
657
658        Ok(())
659    }
660}
661
662/// Context to update the region manifest.
663#[derive(Debug)]
664pub(crate) struct ManifestContext {
665    /// Manager to maintain manifest for this region.
666    pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
667    /// The state of the region. The region checks the state before updating
668    /// manifest.
669    state: AtomicCell<RegionRoleState>,
670}
671
672impl ManifestContext {
673    pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
674        ManifestContext {
675            manifest_manager: tokio::sync::RwLock::new(manager),
676            state: AtomicCell::new(state),
677        }
678    }
679
680    pub(crate) async fn manifest_version(&self) -> ManifestVersion {
681        self.manifest_manager
682            .read()
683            .await
684            .manifest()
685            .manifest_version
686    }
687
688    pub(crate) async fn has_update(&self) -> Result<bool> {
689        self.manifest_manager.read().await.has_update().await
690    }
691
692    /// Returns the current region role state.
693    pub(crate) fn current_state(&self) -> RegionRoleState {
694        self.state.load()
695    }
696
697    /// Installs the manifest changes from the current version to the target version (inclusive).
698    ///
699    /// Returns installed [RegionManifest].
700    /// **Note**: This function is not guaranteed to install the target version strictly.
701    /// The installed version may be greater than the target version.
702    pub(crate) async fn install_manifest_to(
703        &self,
704        version: ManifestVersion,
705    ) -> Result<Arc<RegionManifest>> {
706        let mut manager = self.manifest_manager.write().await;
707        manager.install_manifest_to(version).await?;
708
709        Ok(manager.manifest())
710    }
711
712    /// Updates the manifest if current state is `expect_state`.
713    pub(crate) async fn update_manifest(
714        &self,
715        expect_state: RegionLeaderState,
716        action_list: RegionMetaActionList,
717    ) -> Result<ManifestVersion> {
718        // Acquires the write lock of the manifest manager.
719        let mut manager = self.manifest_manager.write().await;
720        // Gets current manifest.
721        let manifest = manager.manifest();
722        // Checks state inside the lock. This is to ensure that we won't update the manifest
723        // after `set_readonly_gracefully()` is called.
724        let current_state = self.state.load();
725
726        // If expect_state is not downgrading, the current state must be either `expect_state` or downgrading.
727        //
728        // A downgrading leader rejects user writes but still allows
729        // flushing the memtable and updating the manifest.
730        if expect_state != RegionLeaderState::Downgrading {
731            if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
732                info!(
733                    "Region {} is in downgrading leader state, updating manifest. state is {:?}",
734                    manifest.metadata.region_id, expect_state
735                );
736            }
737            ensure!(
738                current_state == RegionRoleState::Leader(expect_state)
739                    || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
740                UpdateManifestSnafu {
741                    region_id: manifest.metadata.region_id,
742                    state: current_state,
743                }
744            );
745        } else {
746            ensure!(
747                current_state == RegionRoleState::Leader(expect_state),
748                RegionStateSnafu {
749                    region_id: manifest.metadata.region_id,
750                    state: current_state,
751                    expect: RegionRoleState::Leader(expect_state),
752                }
753            );
754        }
755
756        for action in &action_list.actions {
757            // Checks whether the edit is still applicable.
758            let RegionMetaAction::Edit(edit) = &action else {
759                continue;
760            };
761
762            // Checks whether the region is truncated.
763            let Some(truncated_entry_id) = manifest.truncated_entry_id else {
764                continue;
765            };
766
767            // This is an edit from flush.
768            if let Some(flushed_entry_id) = edit.flushed_entry_id {
769                ensure!(
770                    truncated_entry_id < flushed_entry_id,
771                    RegionTruncatedSnafu {
772                        region_id: manifest.metadata.region_id,
773                    }
774                );
775            }
776
777            // This is an edit from compaction.
778            if !edit.files_to_remove.is_empty() {
779                // Input files of the compaction task has been truncated.
780                for file in &edit.files_to_remove {
781                    ensure!(
782                        manifest.files.contains_key(&file.file_id),
783                        RegionTruncatedSnafu {
784                            region_id: manifest.metadata.region_id,
785                        }
786                    );
787                }
788            }
789        }
790
791        // Now we can update the manifest.
792        let version = manager.update(action_list, current_state).await.inspect_err(
793            |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
794        )?;
795
796        if self.state.load() == RegionRoleState::Follower {
797            warn!(
798                "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
799                manifest.metadata.region_id
800            );
801        }
802
803        Ok(version)
804    }
805
806    /// Sets the [`RegionRole`].
807    ///
808    /// ```text
809    ///     +------------------------------------------+
810    ///     |                      +-----------------+ |
811    ///     |                      |                 | |
812    /// +---+------+       +-------+-----+        +--v-v---+
813    /// | Follower |       | Downgrading |        | Leader |
814    /// +---^-^----+       +-----+-^-----+        +--+-+---+
815    ///     | |                  | |                 | |
816    ///     | +------------------+ +-----------------+ |
817    ///     +------------------------------------------+
818    ///
819    /// Transition:
820    /// - Follower -> Leader
821    /// - Downgrading Leader -> Leader
822    /// - Leader -> Follower
823    /// - Downgrading Leader -> Follower
824    /// - Leader -> Downgrading Leader
825    ///
826    /// ```
827    pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
828        match next_role {
829            RegionRole::Follower => {
830                match self.state.fetch_update(|state| {
831                    if !matches!(state, RegionRoleState::Follower) {
832                        Some(RegionRoleState::Follower)
833                    } else {
834                        None
835                    }
836                }) {
837                    Ok(state) => info!(
838                        "Convert region {} to follower, previous role state: {:?}",
839                        region_id, state
840                    ),
841                    Err(state) => {
842                        if state != RegionRoleState::Follower {
843                            warn!(
844                                "Failed to convert region {} to follower, current role state: {:?}",
845                                region_id, state
846                            )
847                        }
848                    }
849                }
850            }
851            RegionRole::Leader => {
852                match self.state.fetch_update(|state| {
853                    if matches!(
854                        state,
855                        RegionRoleState::Follower
856                            | RegionRoleState::Leader(RegionLeaderState::Downgrading)
857                    ) {
858                        Some(RegionRoleState::Leader(RegionLeaderState::Writable))
859                    } else {
860                        None
861                    }
862                }) {
863                    Ok(state) => info!(
864                        "Convert region {} to leader, previous role state: {:?}",
865                        region_id, state
866                    ),
867                    Err(state) => {
868                        if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
869                            warn!(
870                                "Failed to convert region {} to leader, current role state: {:?}",
871                                region_id, state
872                            )
873                        }
874                    }
875                }
876            }
877            RegionRole::DowngradingLeader => {
878                match self.state.compare_exchange(
879                    RegionRoleState::Leader(RegionLeaderState::Writable),
880                    RegionRoleState::Leader(RegionLeaderState::Downgrading),
881                ) {
882                    Ok(state) => info!(
883                        "Convert region {} to downgrading region, previous role state: {:?}",
884                        region_id, state
885                    ),
886                    Err(state) => {
887                        if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
888                            warn!(
889                                "Failed to convert region {} to downgrading leader, current role state: {:?}",
890                                region_id, state
891                            )
892                        }
893                    }
894                }
895            }
896        }
897    }
898
899    pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
900        self.manifest_manager.read().await.manifest()
901    }
902}
903
904pub(crate) type ManifestContextRef = Arc<ManifestContext>;
905
906/// Regions indexed by ids.
907#[derive(Debug, Default)]
908pub(crate) struct RegionMap {
909    regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
910}
911
912impl RegionMap {
913    /// Returns true if the region exists.
914    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
915        let regions = self.regions.read().unwrap();
916        regions.contains_key(&region_id)
917    }
918
919    /// Inserts a new region into the map.
920    pub(crate) fn insert_region(&self, region: MitoRegionRef) {
921        let mut regions = self.regions.write().unwrap();
922        regions.insert(region.region_id, region);
923    }
924
925    /// Gets region by region id.
926    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
927        let regions = self.regions.read().unwrap();
928        regions.get(&region_id).cloned()
929    }
930
931    /// Gets writable region by region id.
932    ///
933    /// Returns error if the region does not exist or is readonly.
934    pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
935        let region = self
936            .get_region(region_id)
937            .context(RegionNotFoundSnafu { region_id })?;
938        ensure!(
939            region.is_writable(),
940            RegionStateSnafu {
941                region_id,
942                state: region.state(),
943                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
944            }
945        );
946        Ok(region)
947    }
948
949    /// Gets readonly region by region id.
950    ///
951    /// Returns error if the region does not exist or is writable.
952    pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
953        let region = self
954            .get_region(region_id)
955            .context(RegionNotFoundSnafu { region_id })?;
956        ensure!(
957            region.is_follower(),
958            RegionStateSnafu {
959                region_id,
960                state: region.state(),
961                expect: RegionRoleState::Follower,
962            }
963        );
964
965        Ok(region)
966    }
967
968    /// Gets region by region id.
969    ///
970    /// Calls the callback if the region does not exist.
971    pub(crate) fn get_region_or<F: OnFailure>(
972        &self,
973        region_id: RegionId,
974        cb: &mut F,
975    ) -> Option<MitoRegionRef> {
976        match self
977            .get_region(region_id)
978            .context(RegionNotFoundSnafu { region_id })
979        {
980            Ok(region) => Some(region),
981            Err(e) => {
982                cb.on_failure(e);
983                None
984            }
985        }
986    }
987
988    /// Gets writable region by region id.
989    ///
990    /// Calls the callback if the region does not exist or is readonly.
991    pub(crate) fn writable_region_or<F: OnFailure>(
992        &self,
993        region_id: RegionId,
994        cb: &mut F,
995    ) -> Option<MitoRegionRef> {
996        match self.writable_region(region_id) {
997            Ok(region) => Some(region),
998            Err(e) => {
999                cb.on_failure(e);
1000                None
1001            }
1002        }
1003    }
1004
1005    /// Gets writable non-staging region by region id.
1006    ///
1007    /// Returns error if the region does not exist, is readonly, or is in staging mode.
1008    pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1009        let region = self.writable_region(region_id)?;
1010        if region.is_staging() {
1011            return Err(crate::error::RegionStateSnafu {
1012                region_id,
1013                state: region.state(),
1014                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1015            }
1016            .build());
1017        }
1018        Ok(region)
1019    }
1020
1021    /// Gets flushable region by region id.
1022    ///
1023    /// Returns error if the region does not exist or is not operable.
1024    fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1025        let region = self
1026            .get_region(region_id)
1027            .context(RegionNotFoundSnafu { region_id })?;
1028        ensure!(
1029            region.is_flushable(),
1030            FlushableRegionStateSnafu {
1031                region_id,
1032                state: region.state(),
1033            }
1034        );
1035        Ok(region)
1036    }
1037
1038    /// Gets flushable region by region id.
1039    ///
1040    /// Calls the callback if the region does not exist or is not operable.
1041    pub(crate) fn flushable_region_or<F: OnFailure>(
1042        &self,
1043        region_id: RegionId,
1044        cb: &mut F,
1045    ) -> Option<MitoRegionRef> {
1046        match self.flushable_region(region_id) {
1047            Ok(region) => Some(region),
1048            Err(e) => {
1049                cb.on_failure(e);
1050                None
1051            }
1052        }
1053    }
1054
1055    /// Remove region by id.
1056    pub(crate) fn remove_region(&self, region_id: RegionId) {
1057        let mut regions = self.regions.write().unwrap();
1058        regions.remove(&region_id);
1059    }
1060
1061    /// List all regions.
1062    pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1063        let regions = self.regions.read().unwrap();
1064        regions.values().cloned().collect()
1065    }
1066
1067    /// Clear the map.
1068    pub(crate) fn clear(&self) {
1069        self.regions.write().unwrap().clear();
1070    }
1071}
1072
1073pub(crate) type RegionMapRef = Arc<RegionMap>;
1074
1075/// Opening regions
1076#[derive(Debug, Default)]
1077pub(crate) struct OpeningRegions {
1078    regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1079}
1080
1081impl OpeningRegions {
1082    /// Registers `sender` for an opening region; Otherwise, it returns `None`.
1083    pub(crate) fn wait_for_opening_region(
1084        &self,
1085        region_id: RegionId,
1086        sender: OptionOutputTx,
1087    ) -> Option<OptionOutputTx> {
1088        let mut regions = self.regions.write().unwrap();
1089        match regions.entry(region_id) {
1090            Entry::Occupied(mut senders) => {
1091                senders.get_mut().push(sender);
1092                None
1093            }
1094            Entry::Vacant(_) => Some(sender),
1095        }
1096    }
1097
1098    /// Returns true if the region exists.
1099    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1100        let regions = self.regions.read().unwrap();
1101        regions.contains_key(&region_id)
1102    }
1103
1104    /// Inserts a new region into the map.
1105    pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1106        let mut regions = self.regions.write().unwrap();
1107        regions.insert(region, vec![sender]);
1108    }
1109
1110    /// Remove region by id.
1111    pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1112        let mut regions = self.regions.write().unwrap();
1113        regions.remove(&region_id).unwrap_or_default()
1114    }
1115
1116    #[cfg(test)]
1117    pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1118        let regions = self.regions.read().unwrap();
1119        if let Some(senders) = regions.get(&region_id) {
1120            senders.len()
1121        } else {
1122            0
1123        }
1124    }
1125}
1126
1127pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1128
1129/// Manifest stats.
1130#[derive(Default, Debug, Clone)]
1131pub(crate) struct ManifestStats {
1132    total_manifest_size: Arc<AtomicU64>,
1133    manifest_version: Arc<AtomicU64>,
1134}
1135
1136impl ManifestStats {
1137    fn total_manifest_size(&self) -> u64 {
1138        self.total_manifest_size.load(Ordering::Relaxed)
1139    }
1140
1141    fn manifest_version(&self) -> u64 {
1142        self.manifest_version.load(Ordering::Relaxed)
1143    }
1144}
1145
1146#[cfg(test)]
1147mod tests {
1148    use std::sync::Arc;
1149    use std::sync::atomic::AtomicU64;
1150
1151    use common_datasource::compression::CompressionType;
1152    use common_test_util::temp_dir::create_temp_dir;
1153    use crossbeam_utils::atomic::AtomicCell;
1154    use object_store::ObjectStore;
1155    use object_store::services::Fs;
1156    use store_api::logstore::provider::Provider;
1157    use store_api::region_engine::RegionRole;
1158    use store_api::region_request::PathType;
1159    use store_api::storage::RegionId;
1160
1161    use crate::access_layer::AccessLayer;
1162    use crate::manifest::action::RegionMetaActionList;
1163    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1164    use crate::region::{
1165        ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1166    };
1167    use crate::sst::FormatType;
1168    use crate::sst::index::intermediate::IntermediateManager;
1169    use crate::sst::index::puffin_manager::PuffinManagerFactory;
1170    use crate::test_util::memtable_util::EmptyMemtableBuilder;
1171    use crate::test_util::scheduler_util::SchedulerEnv;
1172    use crate::test_util::version_util::VersionControlBuilder;
1173    use crate::time_provider::StdTimeProvider;
1174
1175    #[test]
1176    fn test_region_state_lock_free() {
1177        assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1178    }
1179
1180    #[tokio::test]
1181    async fn test_set_region_state() {
1182        let env = SchedulerEnv::new().await;
1183        let builder = VersionControlBuilder::new();
1184        let version_control = Arc::new(builder.build());
1185        let manifest_ctx = env
1186            .mock_manifest_context(version_control.current().version.metadata.clone())
1187            .await;
1188
1189        let region_id = RegionId::new(1024, 0);
1190        // Leader -> Follower
1191        manifest_ctx.set_role(RegionRole::Follower, region_id);
1192        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1193
1194        // Follower -> Leader
1195        manifest_ctx.set_role(RegionRole::Leader, region_id);
1196        assert_eq!(
1197            manifest_ctx.state.load(),
1198            RegionRoleState::Leader(RegionLeaderState::Writable)
1199        );
1200
1201        // Leader -> Downgrading Leader
1202        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1203        assert_eq!(
1204            manifest_ctx.state.load(),
1205            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1206        );
1207
1208        // Downgrading Leader -> Follower
1209        manifest_ctx.set_role(RegionRole::Follower, region_id);
1210        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1211
1212        // Can't downgrade from follower (Follower -> Downgrading Leader)
1213        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1214        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1215
1216        // Set region role too Downgrading Leader
1217        manifest_ctx.set_role(RegionRole::Leader, region_id);
1218        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1219        assert_eq!(
1220            manifest_ctx.state.load(),
1221            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1222        );
1223
1224        // Downgrading Leader -> Leader
1225        manifest_ctx.set_role(RegionRole::Leader, region_id);
1226        assert_eq!(
1227            manifest_ctx.state.load(),
1228            RegionRoleState::Leader(RegionLeaderState::Writable)
1229        );
1230    }
1231
1232    #[tokio::test]
1233    async fn test_staging_state_validation() {
1234        let env = SchedulerEnv::new().await;
1235        let builder = VersionControlBuilder::new();
1236        let version_control = Arc::new(builder.build());
1237
1238        // Create context with staging state using the correct pattern from SchedulerEnv
1239        let staging_ctx = {
1240            let manager = RegionManifestManager::new(
1241                version_control.current().version.metadata.clone(),
1242                0,
1243                RegionManifestOptions {
1244                    manifest_dir: "".to_string(),
1245                    object_store: env.access_layer.object_store().clone(),
1246                    compress_type: CompressionType::Uncompressed,
1247                    checkpoint_distance: 10,
1248                    remove_file_options: Default::default(),
1249                },
1250                Default::default(),
1251                Default::default(),
1252                FormatType::PrimaryKey,
1253            )
1254            .await
1255            .unwrap();
1256            Arc::new(ManifestContext::new(
1257                manager,
1258                RegionRoleState::Leader(RegionLeaderState::Staging),
1259            ))
1260        };
1261
1262        // Test staging state behavior
1263        assert_eq!(
1264            staging_ctx.current_state(),
1265            RegionRoleState::Leader(RegionLeaderState::Staging)
1266        );
1267
1268        // Test writable context for comparison
1269        let writable_ctx = env
1270            .mock_manifest_context(version_control.current().version.metadata.clone())
1271            .await;
1272
1273        assert_eq!(
1274            writable_ctx.current_state(),
1275            RegionRoleState::Leader(RegionLeaderState::Writable)
1276        );
1277    }
1278
1279    #[tokio::test]
1280    async fn test_staging_state_transitions() {
1281        let builder = VersionControlBuilder::new();
1282        let version_control = Arc::new(builder.build());
1283        let metadata = version_control.current().version.metadata.clone();
1284
1285        // Create MitoRegion for testing state transitions
1286        let temp_dir = create_temp_dir("");
1287        let path_str = temp_dir.path().display().to_string();
1288        let fs_builder = Fs::default().root(&path_str);
1289        let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1290
1291        let index_aux_path = temp_dir.path().join("index_aux");
1292        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1293            .await
1294            .unwrap();
1295        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1296            .await
1297            .unwrap();
1298
1299        let access_layer = Arc::new(AccessLayer::new(
1300            "",
1301            PathType::Bare,
1302            object_store,
1303            puffin_mgr,
1304            intm_mgr,
1305        ));
1306
1307        let manager = RegionManifestManager::new(
1308            metadata.clone(),
1309            0,
1310            RegionManifestOptions {
1311                manifest_dir: "".to_string(),
1312                object_store: access_layer.object_store().clone(),
1313                compress_type: CompressionType::Uncompressed,
1314                checkpoint_distance: 10,
1315                remove_file_options: Default::default(),
1316            },
1317            Default::default(),
1318            Default::default(),
1319            FormatType::PrimaryKey,
1320        )
1321        .await
1322        .unwrap();
1323
1324        let manifest_ctx = Arc::new(ManifestContext::new(
1325            manager,
1326            RegionRoleState::Leader(RegionLeaderState::Writable),
1327        ));
1328
1329        let region = MitoRegion {
1330            region_id: metadata.region_id,
1331            version_control,
1332            access_layer,
1333            manifest_ctx: manifest_ctx.clone(),
1334            file_purger: crate::test_util::new_noop_file_purger(),
1335            provider: Provider::noop_provider(),
1336            last_flush_millis: Default::default(),
1337            last_compaction_millis: Default::default(),
1338            time_provider: Arc::new(StdTimeProvider),
1339            topic_latest_entry_id: Default::default(),
1340            written_bytes: Arc::new(AtomicU64::new(0)),
1341            memtable_builder: Arc::new(EmptyMemtableBuilder::default()),
1342            sst_format: FormatType::PrimaryKey,
1343            stats: ManifestStats::default(),
1344        };
1345
1346        // Test initial state
1347        assert_eq!(
1348            region.state(),
1349            RegionRoleState::Leader(RegionLeaderState::Writable)
1350        );
1351        assert!(!region.is_staging());
1352
1353        // Test transition to staging
1354        let mut manager = manifest_ctx.manifest_manager.write().await;
1355        region.set_staging(&mut manager).await.unwrap();
1356        drop(manager);
1357        assert_eq!(
1358            region.state(),
1359            RegionRoleState::Leader(RegionLeaderState::Staging)
1360        );
1361        assert!(region.is_staging());
1362
1363        // Test transition back to writable
1364        region.exit_staging().unwrap();
1365        assert_eq!(
1366            region.state(),
1367            RegionRoleState::Leader(RegionLeaderState::Writable)
1368        );
1369        assert!(!region.is_staging());
1370
1371        // Test staging directory cleanup: Create dirty staging files before entering staging mode
1372        {
1373            // Create some dummy staging manifest files to simulate interrupted session
1374            let manager = manifest_ctx.manifest_manager.write().await;
1375            let dummy_actions = RegionMetaActionList::new(vec![]);
1376            let dummy_bytes = dummy_actions.encode().unwrap();
1377
1378            // Create dirty staging files with versions 100 and 101
1379            manager.store().save(100, &dummy_bytes, true).await.unwrap();
1380            manager.store().save(101, &dummy_bytes, true).await.unwrap();
1381            drop(manager);
1382
1383            // Verify dirty files exist before entering staging
1384            let manager = manifest_ctx.manifest_manager.read().await;
1385            let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1386            assert_eq!(
1387                dirty_manifests.len(),
1388                2,
1389                "Should have 2 dirty staging files"
1390            );
1391            drop(manager);
1392
1393            // Enter staging mode - this should clean up the dirty files
1394            let mut manager = manifest_ctx.manifest_manager.write().await;
1395            region.set_staging(&mut manager).await.unwrap();
1396            drop(manager);
1397
1398            // Verify dirty files are cleaned up after entering staging
1399            let manager = manifest_ctx.manifest_manager.read().await;
1400            let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1401            assert_eq!(
1402                cleaned_manifests.len(),
1403                0,
1404                "Dirty staging files should be cleaned up"
1405            );
1406            drop(manager);
1407
1408            // Exit staging to restore normal state for remaining tests
1409            region.exit_staging().unwrap();
1410        }
1411
1412        // Test invalid transitions
1413        let mut manager = manifest_ctx.manifest_manager.write().await;
1414        assert!(region.set_staging(&mut manager).await.is_ok()); // Writable -> Staging should work
1415        drop(manager);
1416        let mut manager = manifest_ctx.manifest_manager.write().await;
1417        assert!(region.set_staging(&mut manager).await.is_err()); // Staging -> Staging should fail
1418        drop(manager);
1419        assert!(region.exit_staging().is_ok()); // Staging -> Writable should work
1420        assert!(region.exit_staging().is_err()); // Writable -> Writable should fail
1421    }
1422}