Skip to main content

mito2/
region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region.
16
17pub mod catchup;
18pub mod opener;
19pub mod options;
20pub mod utils;
21pub(crate) mod version;
22
23use std::collections::hash_map::Entry;
24use std::collections::{HashMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27
28use common_base::hash::partition_expr_version;
29use common_telemetry::{error, info, warn};
30use crossbeam_utils::atomic::AtomicCell;
31use partition::expr::PartitionExpr;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::PrimaryKeyEncoding;
35use store_api::logstore::provider::Provider;
36use store_api::metadata::RegionMetadataRef;
37use store_api::region_engine::{
38    RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
39};
40use store_api::region_request::{PathType, StagingPartitionDirective};
41use store_api::sst_entry::ManifestSstEntry;
42use store_api::storage::{FileId, RegionId, SequenceNumber};
43use tokio::sync::RwLockWriteGuard;
44pub use utils::*;
45
46use crate::access_layer::AccessLayerRef;
47use crate::error::{
48    InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
49    UnexpectedSnafu, UpdateManifestSnafu,
50};
51use crate::manifest::action::{
52    RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
53};
54use crate::manifest::manager::RegionManifestManager;
55use crate::region::version::{VersionControlRef, VersionRef};
56use crate::request::{OnFailure, OptionOutputTx};
57use crate::sst::file::FileMeta;
58use crate::sst::file_purger::FilePurgerRef;
59use crate::sst::location::{index_file_path, sst_file_path};
60use crate::time_provider::TimeProviderRef;
61
62/// This is the approximate factor to estimate the size of wal.
63const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
64
65/// Region status include region id, memtable usage, sst usage, wal usage and manifest usage.
66#[derive(Debug)]
67pub struct RegionUsage {
68    pub region_id: RegionId,
69    pub wal_usage: u64,
70    pub sst_usage: u64,
71    pub manifest_usage: u64,
72}
73
74impl RegionUsage {
75    pub fn disk_usage(&self) -> u64 {
76        self.wal_usage + self.sst_usage + self.manifest_usage
77    }
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum RegionLeaderState {
82    /// The region is opened and is writable.
83    Writable,
84    /// The region is in staging mode - writable but no checkpoint/compaction.
85    Staging,
86    /// The region is entering staging mode. - write requests will be stalled.
87    EnteringStaging,
88    /// The region is altering.
89    Altering,
90    /// The region is dropping.
91    Dropping,
92    /// The region is truncating.
93    Truncating,
94    /// The region is handling a region edit.
95    Editing,
96    /// The region is stepping down.
97    Downgrading,
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum RegionRoleState {
102    Leader(RegionLeaderState),
103    Follower,
104}
105
106impl RegionRoleState {
107    /// Converts the region role state to leader state if it is a leader state.
108    pub fn into_leader_state(self) -> Option<RegionLeaderState> {
109        match self {
110            RegionRoleState::Leader(leader_state) => Some(leader_state),
111            RegionRoleState::Follower => None,
112        }
113    }
114}
115
116/// Metadata and runtime status of a region.
117///
118/// Writing and reading a region follow a single-writer-multi-reader rule:
119/// - Only the region worker thread this region belongs to can modify the metadata.
120/// - Multiple reader threads are allowed to read a specific `version` of a region.
121#[derive(Debug)]
122pub struct MitoRegion {
123    /// Id of this region.
124    ///
125    /// Accessing region id from the version control is inconvenient so
126    /// we also store it here.
127    pub(crate) region_id: RegionId,
128
129    /// Version controller for this region.
130    ///
131    /// We MUST update the version control inside the write lock of the region manifest manager.
132    pub(crate) version_control: VersionControlRef,
133    /// SSTs accessor for this region.
134    pub(crate) access_layer: AccessLayerRef,
135    /// Context to maintain manifest for this region.
136    pub(crate) manifest_ctx: ManifestContextRef,
137    /// SST file purger.
138    pub(crate) file_purger: FilePurgerRef,
139    /// The provider of log store.
140    pub(crate) provider: Provider,
141    /// Last flush time in millis.
142    last_flush_millis: AtomicI64,
143    /// Last compaction time in millis.
144    last_compaction_millis: AtomicI64,
145    /// Provider to get current time.
146    time_provider: TimeProviderRef,
147    /// The topic's latest entry id since the region's last flushing.
148    /// **Only used for remote WAL pruning.**
149    ///
150    /// The value will be updated to the latest offset of the topic
151    /// if region receives a flush request or schedules a periodic flush task
152    /// and the region's memtable is empty.
153    ///
154    /// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region,
155    /// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`.
156    pub(crate) topic_latest_entry_id: AtomicU64,
157    /// The total bytes written to the region.
158    pub(crate) written_bytes: Arc<AtomicU64>,
159    /// manifest stats
160    stats: ManifestStats,
161}
162
163pub type MitoRegionRef = Arc<MitoRegion>;
164
165#[derive(Debug, Clone)]
166pub(crate) struct StagingPartitionInfo {
167    pub(crate) partition_directive: StagingPartitionDirective,
168    pub(crate) partition_rule_version: u64,
169}
170
171impl StagingPartitionInfo {
172    /// Returns the partition expression carried by the staging directive, if any.
173    pub(crate) fn partition_expr(&self) -> Option<&str> {
174        self.partition_directive.partition_expr()
175    }
176
177    /// Builds staging partition info from a directive and derives its version marker.
178    pub(crate) fn from_partition_directive(partition_directive: StagingPartitionDirective) -> Self {
179        let partition_rule_version = match &partition_directive {
180            StagingPartitionDirective::UpdatePartitionExpr(expr) => {
181                partition_expr_version(Some(expr))
182            }
183            StagingPartitionDirective::RejectAllWrites => 0,
184        };
185        Self {
186            partition_directive,
187            partition_rule_version,
188        }
189    }
190}
191
192impl MitoRegion {
193    /// Stop background managers for this region.
194    pub(crate) async fn stop(&self) {
195        self.manifest_ctx
196            .manifest_manager
197            .write()
198            .await
199            .stop()
200            .await;
201
202        info!(
203            "Stopped region manifest manager, region_id: {}",
204            self.region_id
205        );
206    }
207
208    /// Returns current metadata of the region.
209    pub fn metadata(&self) -> RegionMetadataRef {
210        let version_data = self.version_control.current();
211        version_data.version.metadata.clone()
212    }
213
214    /// Returns primary key encoding of the region.
215    pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
216        let version_data = self.version_control.current();
217        version_data.version.metadata.primary_key_encoding
218    }
219
220    /// Returns current version of the region.
221    pub(crate) fn version(&self) -> VersionRef {
222        let version_data = self.version_control.current();
223        version_data.version
224    }
225
226    /// Returns last flush timestamp in millis.
227    pub(crate) fn last_flush_millis(&self) -> i64 {
228        self.last_flush_millis.load(Ordering::Relaxed)
229    }
230
231    /// Update flush time to current time.
232    pub(crate) fn update_flush_millis(&self) {
233        let now = self.time_provider.current_time_millis();
234        self.last_flush_millis.store(now, Ordering::Relaxed);
235    }
236
237    /// Returns last compaction timestamp in millis.
238    pub(crate) fn last_compaction_millis(&self) -> i64 {
239        self.last_compaction_millis.load(Ordering::Relaxed)
240    }
241
242    /// Update compaction time to current time.
243    pub(crate) fn update_compaction_millis(&self) {
244        let now = self.time_provider.current_time_millis();
245        self.last_compaction_millis.store(now, Ordering::Relaxed);
246    }
247
248    /// Returns the table dir.
249    pub(crate) fn table_dir(&self) -> &str {
250        self.access_layer.table_dir()
251    }
252
253    /// Returns the path type of the region.
254    pub(crate) fn path_type(&self) -> PathType {
255        self.access_layer.path_type()
256    }
257
258    /// Returns whether the region is writable.
259    pub(crate) fn is_writable(&self) -> bool {
260        matches!(
261            self.manifest_ctx.state.load(),
262            RegionRoleState::Leader(RegionLeaderState::Writable)
263                | RegionRoleState::Leader(RegionLeaderState::Staging)
264        )
265    }
266
267    /// Returns whether the region is flushable.
268    pub(crate) fn is_flushable(&self) -> bool {
269        matches!(
270            self.manifest_ctx.state.load(),
271            RegionRoleState::Leader(RegionLeaderState::Writable)
272                | RegionRoleState::Leader(RegionLeaderState::Staging)
273                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
274        )
275    }
276
277    /// Returns whether the region should abort index building.
278    pub(crate) fn should_abort_index(&self) -> bool {
279        matches!(
280            self.manifest_ctx.state.load(),
281            RegionRoleState::Follower
282                | RegionRoleState::Leader(RegionLeaderState::Dropping)
283                | RegionRoleState::Leader(RegionLeaderState::Truncating)
284                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
285                | RegionRoleState::Leader(RegionLeaderState::Staging)
286        )
287    }
288
289    /// Returns whether the region is downgrading.
290    pub(crate) fn is_downgrading(&self) -> bool {
291        matches!(
292            self.manifest_ctx.state.load(),
293            RegionRoleState::Leader(RegionLeaderState::Downgrading)
294        )
295    }
296
297    /// Returns whether the region is in staging mode.
298    pub(crate) fn is_staging(&self) -> bool {
299        self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
300    }
301
302    /// Returns whether the region is entering staging mode.
303    pub(crate) fn is_enter_staging(&self) -> bool {
304        self.manifest_ctx.state.load()
305            == RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
306    }
307
308    pub fn region_id(&self) -> RegionId {
309        self.region_id
310    }
311
312    pub fn find_committed_sequence(&self) -> SequenceNumber {
313        self.version_control.committed_sequence()
314    }
315
316    /// Returns whether the region is readonly.
317    pub fn is_follower(&self) -> bool {
318        self.manifest_ctx.state.load() == RegionRoleState::Follower
319    }
320
321    /// Returns the state of the region.
322    pub(crate) fn state(&self) -> RegionRoleState {
323        self.manifest_ctx.state.load()
324    }
325
326    /// Sets the region role state.
327    pub(crate) fn set_role(&self, next_role: RegionRole) {
328        self.manifest_ctx.set_role(next_role, self.region_id);
329    }
330
331    pub(crate) fn region_role(&self) -> RegionRole {
332        match self.state() {
333            RegionRoleState::Follower => RegionRole::Follower,
334            RegionRoleState::Leader(RegionLeaderState::Staging) => RegionRole::StagingLeader,
335            RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
336                RegionRole::DowngradingLeader
337            }
338            RegionRoleState::Leader(_) => RegionRole::Leader,
339        }
340    }
341
342    /// Sets the altering state.
343    /// You should call this method in the worker loop.
344    pub(crate) fn set_altering(&self) -> Result<()> {
345        self.compare_exchange_state(
346            RegionLeaderState::Writable,
347            RegionRoleState::Leader(RegionLeaderState::Altering),
348        )
349    }
350
351    /// Sets the dropping state.
352    /// You should call this method in the worker loop.
353    pub(crate) fn set_dropping(&self, expect: RegionLeaderState) -> Result<()> {
354        self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Dropping))
355    }
356
357    /// Sets the truncating state.
358    /// You should call this method in the worker loop.
359    pub(crate) fn set_truncating(&self) -> Result<()> {
360        self.compare_exchange_state(
361            RegionLeaderState::Writable,
362            RegionRoleState::Leader(RegionLeaderState::Truncating),
363        )
364    }
365
366    /// Sets the editing state.
367    /// You should call this method in the worker loop.
368    pub(crate) fn set_editing(&self, expect: RegionLeaderState) -> Result<()> {
369        self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Editing))
370    }
371
372    /// Sets the staging state.
373    ///
374    /// You should call this method in the worker loop.
375    /// Transitions from Writable to Staging state.
376    /// Cleans any existing staging manifests before entering staging mode.
377    pub(crate) async fn set_staging(
378        &self,
379        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
380    ) -> Result<()> {
381        manager.store().clear_staging_manifests().await?;
382
383        self.compare_exchange_state(
384            RegionLeaderState::Writable,
385            RegionRoleState::Leader(RegionLeaderState::Staging),
386        )
387    }
388
389    /// Sets the entering staging state.
390    pub(crate) fn set_entering_staging(&self) -> Result<()> {
391        self.compare_exchange_state(
392            RegionLeaderState::Writable,
393            RegionRoleState::Leader(RegionLeaderState::EnteringStaging),
394        )
395    }
396
397    /// Exits the staging state back to writable.
398    ///
399    /// You should call this method in the worker loop.
400    /// Transitions from Staging to Writable state.
401    pub fn exit_staging(&self) -> Result<()> {
402        self.manifest_ctx.exit_staging(
403            self.region_id,
404            RegionRoleState::Leader(RegionLeaderState::Writable),
405        )
406    }
407
408    /// Sets the region role state gracefully. This acquires the manifest write lock.
409    pub(crate) async fn set_role_state_gracefully(
410        &self,
411        state: SettableRegionRoleState,
412    ) -> Result<()> {
413        let mut manager: RwLockWriteGuard<'_, RegionManifestManager> =
414            self.manifest_ctx.manifest_manager.write().await;
415        let current_state = self.state();
416
417        match state {
418            SettableRegionRoleState::Leader => {
419                // Exit staging mode and return to normal writable leader
420                // Only allowed from staging state
421                match current_state {
422                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
423                        info!("Exiting staging mode for region {}", self.region_id);
424                        // Use the success exit path that merges all staged manifests
425                        self.exit_staging_on_success(&mut manager).await?;
426                    }
427                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
428                        // Already in desired state - no-op
429                        info!("Region {} already in normal leader mode", self.region_id);
430                    }
431                    _ => {
432                        // Only staging -> leader transition is allowed
433                        return Err(RegionStateSnafu {
434                            region_id: self.region_id,
435                            state: current_state,
436                            expect: RegionRoleState::Leader(RegionLeaderState::Staging),
437                        }
438                        .build());
439                    }
440                }
441            }
442
443            SettableRegionRoleState::StagingLeader => {
444                // Enter staging mode from normal writable leader
445                // Only allowed from writable leader state
446                match current_state {
447                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
448                        info!("Entering staging mode for region {}", self.region_id);
449                        self.set_staging(&mut manager).await?;
450                    }
451                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
452                        // Already in desired state - no-op
453                        info!("Region {} already in staging mode", self.region_id);
454                    }
455                    _ => {
456                        return Err(RegionStateSnafu {
457                            region_id: self.region_id,
458                            state: current_state,
459                            expect: RegionRoleState::Leader(RegionLeaderState::Writable),
460                        }
461                        .build());
462                    }
463                }
464            }
465
466            SettableRegionRoleState::Follower => {
467                // Make this region a follower
468                match current_state {
469                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
470                        info!(
471                            "Exiting staging and demoting region {} to follower",
472                            self.region_id
473                        );
474                        self.exit_staging()?;
475                        self.set_role(RegionRole::Follower);
476                    }
477                    RegionRoleState::Leader(_) => {
478                        info!("Demoting region {} from leader to follower", self.region_id);
479                        self.set_role(RegionRole::Follower);
480                    }
481                    RegionRoleState::Follower => {
482                        // Already in desired state - no-op
483                        info!("Region {} already in follower mode", self.region_id);
484                    }
485                }
486            }
487
488            SettableRegionRoleState::DowngradingLeader => {
489                // downgrade this region to downgrading leader
490                match current_state {
491                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
492                        info!(
493                            "Exiting staging and entering downgrade for region {}",
494                            self.region_id
495                        );
496                        self.exit_staging()?;
497                        self.set_role(RegionRole::DowngradingLeader);
498                    }
499                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
500                        info!("Starting downgrade for region {}", self.region_id);
501                        self.set_role(RegionRole::DowngradingLeader);
502                    }
503                    RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
504                        // Already in desired state - no-op
505                        info!("Region {} already in downgrading mode", self.region_id);
506                    }
507                    _ => {
508                        warn!(
509                            "Cannot start downgrade for region {} from state {:?}",
510                            self.region_id, current_state
511                        );
512                    }
513                }
514            }
515        }
516
517        // Hack(zhongzc): If we have just become leader (writable), persist any backfilled metadata.
518        if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
519            // Persist backfilled metadata if manifest is missing fields (e.g., partition_expr)
520            let manifest_meta = &manager.manifest().metadata;
521            let current_version = self.version();
522            let current_meta = &current_version.metadata;
523            if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
524                let action = RegionMetaAction::Change(RegionChange {
525                    metadata: current_meta.clone(),
526                    sst_format: current_version.options.sst_format.unwrap_or_default(),
527                    append_mode: None,
528                });
529                let result = manager
530                    .update(RegionMetaActionList::with_action(action), false)
531                    .await;
532
533                match result {
534                    Ok(version) => {
535                        info!(
536                            "Successfully persisted backfilled metadata for region {}, version: {}",
537                            self.region_id, version
538                        );
539                    }
540                    Err(e) => {
541                        warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
542                    }
543                }
544            }
545        }
546
547        drop(manager);
548
549        Ok(())
550    }
551
552    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Writable)` if the current state is `expect`.
553    /// Otherwise, logs an error.
554    pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
555        if let Err(e) = self
556            .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
557        {
558            error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
559        }
560    }
561
562    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Staging)` if the current state is `expect`.
563    /// Otherwise, logs an error.
564    pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
565        if let Err(e) =
566            self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
567        {
568            error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
569        }
570    }
571
572    /// Returns the region statistic.
573    pub(crate) fn region_statistic(&self) -> RegionStatistic {
574        let version = self.version();
575        let memtables = &version.memtables;
576        let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
577
578        let sst_usage = version.ssts.sst_usage();
579        let index_usage = version.ssts.index_usage();
580        let flushed_entry_id = version.flushed_entry_id;
581
582        let wal_usage = self.estimated_wal_usage(memtable_usage);
583        let manifest_usage = self.stats.total_manifest_size();
584        let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
585        let num_files = version.ssts.num_files();
586        let manifest_version = self.stats.manifest_version();
587        let file_removed_cnt = self.stats.file_removed_cnt();
588
589        let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
590        let written_bytes = self.written_bytes.load(Ordering::Relaxed);
591
592        RegionStatistic {
593            num_rows,
594            memtable_size: memtable_usage,
595            wal_size: wal_usage,
596            manifest_size: manifest_usage,
597            sst_size: sst_usage,
598            sst_num: num_files,
599            index_size: index_usage,
600            manifest: RegionManifestInfo::Mito {
601                manifest_version,
602                flushed_entry_id,
603                file_removed_cnt,
604            },
605            data_topic_latest_entry_id: topic_latest_entry_id,
606            metadata_topic_latest_entry_id: topic_latest_entry_id,
607            written_bytes,
608        }
609    }
610
611    /// Estimated WAL size in bytes.
612    /// Use the memtables size to estimate the size of wal.
613    fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
614        ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
615    }
616
617    /// Sets the state of the region to given state if the current state equals to
618    /// the expected.
619    fn compare_exchange_state(
620        &self,
621        expect: RegionLeaderState,
622        state: RegionRoleState,
623    ) -> Result<()> {
624        self.manifest_ctx
625            .state
626            .compare_exchange(RegionRoleState::Leader(expect), state)
627            .map_err(|actual| {
628                RegionStateSnafu {
629                    region_id: self.region_id,
630                    state: actual,
631                    expect: RegionRoleState::Leader(expect),
632                }
633                .build()
634            })?;
635        Ok(())
636    }
637
638    pub fn access_layer(&self) -> AccessLayerRef {
639        self.access_layer.clone()
640    }
641
642    /// Returns the SST entries of the region.
643    pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
644        let table_dir = self.table_dir();
645        let path_type = self.access_layer.path_type();
646
647        let visible_ssts = self
648            .version()
649            .ssts
650            .levels()
651            .iter()
652            .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
653            .collect::<HashSet<_>>();
654
655        let manifest_files = self.manifest_ctx.manifest().await.files.clone();
656        let staging_files = self
657            .manifest_ctx
658            .staging_manifest()
659            .await
660            .map(|m| m.files.clone())
661            .unwrap_or_default();
662        let files = manifest_files
663            .into_iter()
664            .chain(staging_files)
665            .collect::<HashMap<_, _>>();
666
667        files
668            .values()
669            .map(|meta| {
670                let region_id = self.region_id;
671                let origin_region_id = meta.region_id;
672                let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
673                {
674                    let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
675                    (
676                        meta.index_version,
677                        Some(index_file_path),
678                        Some(meta.index_file_size),
679                    )
680                } else {
681                    (0, None, None)
682                };
683                let visible = visible_ssts.contains(&meta.file_id);
684                ManifestSstEntry {
685                    table_dir: table_dir.to_string(),
686                    region_id,
687                    table_id: region_id.table_id(),
688                    region_number: region_id.region_number(),
689                    region_group: region_id.region_group(),
690                    region_sequence: region_id.region_sequence(),
691                    file_id: meta.file_id.to_string(),
692                    index_version,
693                    level: meta.level,
694                    file_path: sst_file_path(table_dir, meta.file_id(), path_type),
695                    file_size: meta.file_size,
696                    index_file_path,
697                    index_file_size,
698                    num_rows: meta.num_rows,
699                    num_row_groups: meta.num_row_groups,
700                    num_series: Some(meta.num_series),
701                    min_ts: meta.time_range.0,
702                    max_ts: meta.time_range.1,
703                    sequence: meta.sequence.map(|s| s.get()),
704                    origin_region_id,
705                    node_id: None,
706                    visible,
707                }
708            })
709            .collect()
710    }
711
712    /// Returns the file metas of the region by file ids.
713    pub async fn file_metas(&self, file_ids: &[FileId]) -> Vec<Option<FileMeta>> {
714        let manifest_files = self.manifest_ctx.manifest().await.files.clone();
715
716        file_ids
717            .iter()
718            .map(|file_id| manifest_files.get(file_id).cloned())
719            .collect::<Vec<_>>()
720    }
721
722    /// Exit staging mode successfully by merging all staged manifests and making them visible.
723    pub(crate) async fn exit_staging_on_success(
724        &self,
725        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
726    ) -> Result<()> {
727        let current_state = self.manifest_ctx.current_state();
728        ensure!(
729            current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
730            RegionStateSnafu {
731                region_id: self.region_id,
732                state: current_state,
733                expect: RegionRoleState::Leader(RegionLeaderState::Staging),
734            }
735        );
736
737        // Merge all staged manifest actions
738        let merged_actions = match manager.merge_staged_actions(current_state).await? {
739            Some(actions) => actions,
740            None => {
741                info!(
742                    "No staged manifests to merge for region {}, exiting staging mode without changes",
743                    self.region_id
744                );
745                // Even if no manifests to merge, we still need to exit staging mode
746                self.exit_staging()?;
747                return Ok(());
748            }
749        };
750        let expect_change = merged_actions.actions.iter().any(|a| a.is_change());
751        let expect_partition_expr_change = merged_actions
752            .actions
753            .iter()
754            .any(|a| a.is_partition_expr_change());
755        let expect_edit = merged_actions.actions.iter().any(|a| a.is_edit());
756        ensure!(
757            !(expect_change && expect_partition_expr_change),
758            UnexpectedSnafu {
759                reason: "unexpected both change and partition expr change actions in merged actions"
760            }
761        );
762        ensure!(
763            expect_change || expect_partition_expr_change,
764            UnexpectedSnafu {
765                reason: "expect a change or partition expr change action in merged actions"
766            }
767        );
768        ensure!(
769            expect_edit,
770            UnexpectedSnafu {
771                reason: "expect an edit action in merged actions"
772            }
773        );
774
775        let (merged_partition_expr_change, merged_change, merged_edit) =
776            merged_actions.clone().split_region_change_and_edit();
777        if let Some(change) = &merged_change {
778            // In staging exit we only allow metadata-only updates. A `Change`
779            // action is accepted only when column definitions are unchanged;
780            // otherwise it is treated as a schema change and rejected.
781            let current_column_metadatas = &self.version().metadata.column_metadatas;
782            ensure!(
783                change.metadata.column_metadatas == *current_column_metadatas,
784                UnexpectedSnafu {
785                    reason: "change action alters column metadata in staging exit"
786                }
787            );
788        }
789
790        // Submit merged actions using the manifest manager's update method
791        // Pass the `false` so it saves to normal directory, not staging
792        let new_version = manager.update(merged_actions, false).await?;
793        info!(
794            "Successfully submitted merged staged manifests for region {}, new version: {}",
795            self.region_id, new_version
796        );
797
798        // Apply the merged changes to in-memory version control
799        if let Some(change) = merged_partition_expr_change {
800            let mut new_metadata = self.version().metadata.as_ref().clone();
801            new_metadata.set_partition_expr(change.partition_expr);
802            self.version_control.alter_metadata(new_metadata.into());
803        }
804        if let Some(change) = merged_change {
805            self.version_control.alter_metadata(change.metadata);
806        }
807        self.version_control
808            .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
809
810        // Clear all staging manifests and transit state
811        if let Err(e) = manager.clear_staging_manifest_and_dir().await {
812            error!(e; "Failed to clear staging manifest dir for region {}", self.region_id);
813        }
814        self.exit_staging()?;
815
816        Ok(())
817    }
818
819    /// Returns the partition expression string for this region.
820    ///
821    /// If the region is currently in staging state, this returns the partition expression held in
822    /// the staging partition field. Otherwise, it returns the partition expression from the primary
823    /// region metadata (current committed version).
824    pub fn maybe_staging_partition_expr_str(&self) -> Option<String> {
825        let is_staging = self.is_staging();
826        if is_staging {
827            let staging_partition_info = self.manifest_ctx.staging_partition_info();
828            if staging_partition_info.is_none() {
829                warn!(
830                    "Staging partition expr is none for region {} in staging state",
831                    self.region_id
832                );
833            }
834            staging_partition_info
835                .as_ref()
836                .and_then(|info| info.partition_expr().map(ToString::to_string))
837        } else {
838            let version = self.version();
839            version.metadata.partition_expr.clone()
840        }
841    }
842
843    pub fn expected_partition_expr_version(&self) -> u64 {
844        if self.is_staging() {
845            self.manifest_ctx
846                .staging_partition_info()
847                .as_ref()
848                .map(|info| info.partition_rule_version)
849                .unwrap_or_default()
850        } else {
851            self.version().metadata.partition_expr_version
852        }
853    }
854
855    /// Returns whether writes should be rejected for this region in staging mode.
856    pub(crate) fn reject_all_writes_in_staging(&self) -> bool {
857        if !self.is_staging() {
858            return false;
859        }
860        self.manifest_ctx
861            .staging_partition_info()
862            .as_ref()
863            .map(|info| {
864                matches!(
865                    info.partition_directive,
866                    StagingPartitionDirective::RejectAllWrites
867                )
868            })
869            .unwrap_or(false)
870    }
871}
872
873/// Context to update the region manifest.
874#[derive(Debug)]
875pub(crate) struct ManifestContext {
876    /// Manager to maintain manifest for this region.
877    pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
878    /// The state of the region. The region checks the state before updating
879    /// manifest.
880    state: AtomicCell<RegionRoleState>,
881    /// Partition info of the region in staging mode.
882    ///
883    /// During the staging mode, the region metadata in [`VersionControlRef`] is not updated,
884    /// so we need to store the partition info separately.
885    staging_partition_info: Mutex<Option<StagingPartitionInfo>>,
886}
887
888impl ManifestContext {
889    pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
890        ManifestContext {
891            manifest_manager: tokio::sync::RwLock::new(manager),
892            state: AtomicCell::new(state),
893            staging_partition_info: Mutex::new(None),
894        }
895    }
896
897    pub(crate) fn staging_partition_info(&self) -> Option<StagingPartitionInfo> {
898        self.staging_partition_info.lock().unwrap().clone()
899    }
900
901    pub(crate) fn set_staging_partition_info(&self, staging_partition_info: StagingPartitionInfo) {
902        let mut current = self.staging_partition_info.lock().unwrap();
903        debug_assert!(current.is_none());
904        *current = Some(staging_partition_info);
905    }
906
907    fn clear_staging_partition_info(&self) {
908        *self.staging_partition_info.lock().unwrap() = None;
909    }
910
911    pub(crate) fn exit_staging(
912        &self,
913        region_id: RegionId,
914        next_state: RegionRoleState,
915    ) -> Result<()> {
916        self.state
917            .compare_exchange(
918                RegionRoleState::Leader(RegionLeaderState::Staging),
919                next_state,
920            )
921            .map_err(|actual| {
922                RegionStateSnafu {
923                    region_id,
924                    state: actual,
925                    expect: RegionRoleState::Leader(RegionLeaderState::Staging),
926                }
927                .build()
928            })?;
929        self.clear_staging_partition_info();
930        Ok(())
931    }
932
933    pub(crate) async fn manifest_version(&self) -> ManifestVersion {
934        self.manifest_manager
935            .read()
936            .await
937            .manifest()
938            .manifest_version
939    }
940
941    pub(crate) async fn has_update(&self) -> Result<bool> {
942        self.manifest_manager.read().await.has_update().await
943    }
944
945    /// Returns the current region role state.
946    pub(crate) fn current_state(&self) -> RegionRoleState {
947        self.state.load()
948    }
949
950    /// Installs the manifest changes from the current version to the target version (inclusive).
951    ///
952    /// Returns installed [RegionManifest].
953    /// **Note**: This function is not guaranteed to install the target version strictly.
954    /// The installed version may be greater than the target version.
955    pub(crate) async fn install_manifest_to(
956        &self,
957        version: ManifestVersion,
958    ) -> Result<Arc<RegionManifest>> {
959        let mut manager = self.manifest_manager.write().await;
960        manager.install_manifest_to(version).await?;
961
962        Ok(manager.manifest())
963    }
964
965    /// Updates the manifest if current state is `expect_state`.
966    pub(crate) async fn update_manifest(
967        &self,
968        expect_state: RegionLeaderState,
969        action_list: RegionMetaActionList,
970        is_staging: bool,
971    ) -> Result<ManifestVersion> {
972        // Acquires the write lock of the manifest manager.
973        let mut manager = self.manifest_manager.write().await;
974        // Gets current manifest.
975        let manifest = manager.manifest();
976        // Checks state inside the lock. This is to ensure that we won't update the manifest
977        // after `set_readonly_gracefully()` is called.
978        let current_state = self.state.load();
979
980        // If expect_state is not downgrading, the current state must be either `expect_state` or downgrading.
981        //
982        // A downgrading leader rejects user writes but still allows
983        // flushing the memtable and updating the manifest.
984        if expect_state != RegionLeaderState::Downgrading {
985            if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
986                info!(
987                    "Region {} is in downgrading leader state, updating manifest. state is {:?}",
988                    manifest.metadata.region_id, expect_state
989                );
990            }
991            ensure!(
992                current_state == RegionRoleState::Leader(expect_state)
993                    || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
994                UpdateManifestSnafu {
995                    region_id: manifest.metadata.region_id,
996                    state: current_state,
997                }
998            );
999        } else {
1000            ensure!(
1001                current_state == RegionRoleState::Leader(expect_state),
1002                RegionStateSnafu {
1003                    region_id: manifest.metadata.region_id,
1004                    state: current_state,
1005                    expect: RegionRoleState::Leader(expect_state),
1006                }
1007            );
1008        }
1009
1010        for action in &action_list.actions {
1011            // Checks whether the edit is still applicable.
1012            let RegionMetaAction::Edit(edit) = &action else {
1013                continue;
1014            };
1015
1016            // Checks whether the region is truncated.
1017            let Some(truncated_entry_id) = manifest.truncated_entry_id else {
1018                continue;
1019            };
1020
1021            // This is an edit from flush.
1022            if let Some(flushed_entry_id) = edit.flushed_entry_id {
1023                // A flush edit is valid after truncate in two cases:
1024                // 1. `flushed_entry_id` moves past `truncated_entry_id`, meaning it definitely
1025                //    flushed data newer than the truncate point.
1026                // 2. `flushed_entry_id` equals `truncated_entry_id`, but `flushed_sequence`
1027                //    increases. This happens in skip-WAL tables where entry id can stay at 0,
1028                //    while sequence still advances for post-truncate writes.
1029                //
1030                // We still reject stale flushes from before truncate:
1031                // if entry id is equal and sequence does not advance, the flush is outdated.
1032                let is_newer_entry = truncated_entry_id < flushed_entry_id;
1033                let is_same_entry_with_newer_sequence = truncated_entry_id == flushed_entry_id
1034                    && edit.flushed_sequence.is_some_and(|flushed_sequence| {
1035                        manifest.flushed_sequence < flushed_sequence
1036                    });
1037
1038                ensure!(
1039                    is_newer_entry || is_same_entry_with_newer_sequence,
1040                    RegionTruncatedSnafu {
1041                        region_id: manifest.metadata.region_id,
1042                    }
1043                );
1044            }
1045
1046            // This is an edit from compaction.
1047            if !edit.files_to_remove.is_empty() {
1048                // Input files of the compaction task has been truncated.
1049                for file in &edit.files_to_remove {
1050                    ensure!(
1051                        manifest.files.contains_key(&file.file_id),
1052                        RegionTruncatedSnafu {
1053                            region_id: manifest.metadata.region_id,
1054                        }
1055                    );
1056                }
1057            }
1058        }
1059
1060        // Now we can update the manifest.
1061        let version = manager.update(action_list, is_staging).await.inspect_err(
1062            |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
1063        )?;
1064
1065        if self.state.load() == RegionRoleState::Follower {
1066            warn!(
1067                "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
1068                manifest.metadata.region_id
1069            );
1070        }
1071
1072        Ok(version)
1073    }
1074
1075    /// Sets the [`RegionRole`].
1076    ///
1077    /// ```text
1078    ///                  +---------------------+
1079    ///                  |   Staging Leader    |
1080    ///                  +----------+----------+
1081    ///                             |
1082    ///                             v
1083    ///     +----------+     +------+-------+     +-------------+
1084    ///     | Follower | <-> |    Leader    | <-> | Downgrading |
1085    ///     +-----+----+     +------+-------+     +------+------+
1086    ///           ^                 ^                    |
1087    ///           +-----------------+--------------------+
1088    ///
1089    /// ```
1090    ///
1091    /// # State Transitions
1092    ///
1093    /// From `Follower`:
1094    /// - `Follower -> Leader`
1095    ///
1096    /// From `Leader`:
1097    /// - `Leader -> Follower`
1098    /// - `Leader -> Downgrading Leader`
1099    ///
1100    /// From `Staging Leader`:
1101    /// - `Staging Leader -> Leader`
1102    /// - `Staging Leader -> Follower`
1103    /// - `Staging Leader -> Downgrading Leader`
1104    ///
1105    /// From `Downgrading Leader`:
1106    /// - `Downgrading Leader -> Leader`
1107    /// - `Downgrading Leader -> Follower`
1108    pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
1109        match next_role {
1110            RegionRole::Follower => {
1111                if self
1112                    .exit_staging(region_id, RegionRoleState::Follower)
1113                    .is_ok()
1114                {
1115                    info!(
1116                        "Convert region {} to follower, previous role state: {:?}",
1117                        region_id,
1118                        RegionRoleState::Leader(RegionLeaderState::Staging)
1119                    );
1120                    return;
1121                }
1122                match self.state.fetch_update(|state| {
1123                    if !matches!(state, RegionRoleState::Follower) {
1124                        Some(RegionRoleState::Follower)
1125                    } else {
1126                        None
1127                    }
1128                }) {
1129                    Ok(state) => info!(
1130                        "Convert region {} to follower, previous role state: {:?}",
1131                        region_id, state
1132                    ),
1133                    Err(state) => {
1134                        if state != RegionRoleState::Follower {
1135                            warn!(
1136                                "Failed to convert region {} to follower, current role state: {:?}",
1137                                region_id, state
1138                            )
1139                        }
1140                    }
1141                }
1142            }
1143            RegionRole::Leader => {
1144                if self
1145                    .exit_staging(
1146                        region_id,
1147                        RegionRoleState::Leader(RegionLeaderState::Writable),
1148                    )
1149                    .is_ok()
1150                {
1151                    info!(
1152                        "Convert region {} to leader, previous role state: {:?}",
1153                        region_id,
1154                        RegionRoleState::Leader(RegionLeaderState::Staging)
1155                    );
1156                    return;
1157                }
1158                match self.state.fetch_update(|state| {
1159                    if matches!(
1160                        state,
1161                        RegionRoleState::Follower
1162                            | RegionRoleState::Leader(RegionLeaderState::Downgrading)
1163                    ) {
1164                        Some(RegionRoleState::Leader(RegionLeaderState::Writable))
1165                    } else {
1166                        None
1167                    }
1168                }) {
1169                    Ok(state) => info!(
1170                        "Convert region {} to leader, previous role state: {:?}",
1171                        region_id, state
1172                    ),
1173                    Err(state) => {
1174                        if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
1175                            warn!(
1176                                "Failed to convert region {} to leader, current role state: {:?}",
1177                                region_id, state
1178                            )
1179                        }
1180                    }
1181                }
1182            }
1183            RegionRole::StagingLeader => {
1184                info!(
1185                    "Ignore direct conversion of region {} to staging leader; staging requires the dedicated workflow",
1186                    region_id
1187                );
1188            }
1189            RegionRole::DowngradingLeader => {
1190                if self
1191                    .exit_staging(
1192                        region_id,
1193                        RegionRoleState::Leader(RegionLeaderState::Downgrading),
1194                    )
1195                    .is_ok()
1196                {
1197                    info!(
1198                        "Convert region {} to downgrading region, previous role state: {:?}",
1199                        region_id,
1200                        RegionRoleState::Leader(RegionLeaderState::Staging)
1201                    );
1202                    return;
1203                }
1204                match self.state.compare_exchange(
1205                    RegionRoleState::Leader(RegionLeaderState::Writable),
1206                    RegionRoleState::Leader(RegionLeaderState::Downgrading),
1207                ) {
1208                    Ok(state) => info!(
1209                        "Convert region {} to downgrading region, previous role state: {:?}",
1210                        region_id, state
1211                    ),
1212                    Err(state) => {
1213                        if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
1214                            warn!(
1215                                "Failed to convert region {} to downgrading leader, current role state: {:?}",
1216                                region_id, state
1217                            )
1218                        }
1219                    }
1220                }
1221            }
1222        }
1223    }
1224
1225    /// Returns the normal manifest of the region.
1226    pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
1227        self.manifest_manager.read().await.manifest()
1228    }
1229
1230    /// Returns the staging manifest of the region.
1231    pub(crate) async fn staging_manifest(
1232        &self,
1233    ) -> Option<Arc<crate::manifest::action::RegionManifest>> {
1234        self.manifest_manager.read().await.staging_manifest()
1235    }
1236}
1237
1238pub(crate) type ManifestContextRef = Arc<ManifestContext>;
1239
1240/// Regions indexed by ids.
1241#[derive(Debug, Default)]
1242pub(crate) struct RegionMap {
1243    regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
1244}
1245
1246impl RegionMap {
1247    /// Returns true if the region exists.
1248    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1249        let regions = self.regions.read().unwrap();
1250        regions.contains_key(&region_id)
1251    }
1252
1253    /// Inserts a new region into the map.
1254    pub(crate) fn insert_region(&self, region: MitoRegionRef) {
1255        let mut regions = self.regions.write().unwrap();
1256        regions.insert(region.region_id, region);
1257    }
1258
1259    /// Gets region by region id.
1260    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1261        let regions = self.regions.read().unwrap();
1262        regions.get(&region_id).cloned()
1263    }
1264
1265    /// Gets writable region by region id.
1266    ///
1267    /// Returns error if the region does not exist or is readonly.
1268    pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1269        let region = self
1270            .get_region(region_id)
1271            .context(RegionNotFoundSnafu { region_id })?;
1272        ensure!(
1273            region.is_writable(),
1274            RegionStateSnafu {
1275                region_id,
1276                state: region.state(),
1277                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1278            }
1279        );
1280        Ok(region)
1281    }
1282
1283    /// Gets readonly region by region id.
1284    ///
1285    /// Returns error if the region does not exist or is writable.
1286    pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1287        let region = self
1288            .get_region(region_id)
1289            .context(RegionNotFoundSnafu { region_id })?;
1290        ensure!(
1291            region.is_follower(),
1292            RegionStateSnafu {
1293                region_id,
1294                state: region.state(),
1295                expect: RegionRoleState::Follower,
1296            }
1297        );
1298
1299        Ok(region)
1300    }
1301
1302    /// Gets region by region id.
1303    ///
1304    /// Calls the callback if the region does not exist.
1305    pub(crate) fn get_region_or<F: OnFailure>(
1306        &self,
1307        region_id: RegionId,
1308        cb: &mut F,
1309    ) -> Option<MitoRegionRef> {
1310        match self
1311            .get_region(region_id)
1312            .context(RegionNotFoundSnafu { region_id })
1313        {
1314            Ok(region) => Some(region),
1315            Err(e) => {
1316                cb.on_failure(e);
1317                None
1318            }
1319        }
1320    }
1321
1322    /// Gets writable region by region id.
1323    ///
1324    /// Calls the callback if the region does not exist or is readonly.
1325    pub(crate) fn writable_region_or<F: OnFailure>(
1326        &self,
1327        region_id: RegionId,
1328        cb: &mut F,
1329    ) -> Option<MitoRegionRef> {
1330        match self.writable_region(region_id) {
1331            Ok(region) => Some(region),
1332            Err(e) => {
1333                cb.on_failure(e);
1334                None
1335            }
1336        }
1337    }
1338
1339    /// Gets writable non-staging region by region id.
1340    ///
1341    /// Returns error if the region does not exist, is readonly, or is in staging mode.
1342    pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1343        let region = self.writable_region(region_id)?;
1344        if region.is_staging() {
1345            return Err(crate::error::RegionStateSnafu {
1346                region_id,
1347                state: region.state(),
1348                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1349            }
1350            .build());
1351        }
1352        Ok(region)
1353    }
1354
1355    /// Gets staging region by region id.
1356    ///
1357    /// Returns error if the region does not exist or is not in staging state.
1358    pub(crate) fn staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1359        let region = self
1360            .get_region(region_id)
1361            .context(RegionNotFoundSnafu { region_id })?;
1362        ensure!(
1363            region.is_staging(),
1364            RegionStateSnafu {
1365                region_id,
1366                state: region.state(),
1367                expect: RegionRoleState::Leader(RegionLeaderState::Staging),
1368            }
1369        );
1370        Ok(region)
1371    }
1372
1373    /// Gets flushable region by region id.
1374    ///
1375    /// Returns error if the region does not exist.
1376    /// Returns None if the region exists but not operatable.
1377    fn flushable_region(&self, region_id: RegionId) -> Result<Option<MitoRegionRef>> {
1378        let region = self
1379            .get_region(region_id)
1380            .context(RegionNotFoundSnafu { region_id })?;
1381        if region.is_flushable() {
1382            Ok(Some(region))
1383        } else {
1384            Ok(None)
1385        }
1386    }
1387
1388    /// Gets flushable region by region id.
1389    ///
1390    /// Calls the callback if the region does not exist.
1391    /// Returns None if the region exists but not operatable.
1392    pub(crate) fn flushable_region_or<F: OnFailure>(
1393        &self,
1394        region_id: RegionId,
1395        cb: &mut F,
1396    ) -> Option<MitoRegionRef> {
1397        match self.flushable_region(region_id) {
1398            Ok(region) => region,
1399            Err(e) => {
1400                cb.on_failure(e);
1401                None
1402            }
1403        }
1404    }
1405
1406    /// Remove region by id.
1407    pub(crate) fn remove_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1408        let mut regions = self.regions.write().unwrap();
1409        regions.remove(&region_id)
1410    }
1411
1412    /// List all regions.
1413    pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1414        let regions = self.regions.read().unwrap();
1415        regions.values().cloned().collect()
1416    }
1417
1418    /// Clear the map.
1419    pub(crate) fn clear(&self) {
1420        self.regions.write().unwrap().clear();
1421    }
1422}
1423
1424pub(crate) type RegionMapRef = Arc<RegionMap>;
1425
1426/// Opening regions
1427#[derive(Debug, Default)]
1428pub(crate) struct OpeningRegions {
1429    regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1430}
1431
1432impl OpeningRegions {
1433    /// Registers `sender` for an opening region; Otherwise, it returns `None`.
1434    pub(crate) fn wait_for_opening_region(
1435        &self,
1436        region_id: RegionId,
1437        sender: OptionOutputTx,
1438    ) -> Option<OptionOutputTx> {
1439        let mut regions = self.regions.write().unwrap();
1440        match regions.entry(region_id) {
1441            Entry::Occupied(mut senders) => {
1442                senders.get_mut().push(sender);
1443                None
1444            }
1445            Entry::Vacant(_) => Some(sender),
1446        }
1447    }
1448
1449    /// Returns true if the region exists.
1450    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1451        let regions = self.regions.read().unwrap();
1452        regions.contains_key(&region_id)
1453    }
1454
1455    /// Inserts a new region into the map.
1456    pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1457        let mut regions = self.regions.write().unwrap();
1458        regions.insert(region, vec![sender]);
1459    }
1460
1461    /// Remove region by id.
1462    pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1463        let mut regions = self.regions.write().unwrap();
1464        regions.remove(&region_id).unwrap_or_default()
1465    }
1466
1467    #[cfg(test)]
1468    pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1469        let regions = self.regions.read().unwrap();
1470        if let Some(senders) = regions.get(&region_id) {
1471            senders.len()
1472        } else {
1473            0
1474        }
1475    }
1476}
1477
1478pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1479
1480/// The regions that are catching up.
1481#[derive(Debug, Default)]
1482pub(crate) struct CatchupRegions {
1483    regions: RwLock<HashSet<RegionId>>,
1484}
1485
1486impl CatchupRegions {
1487    /// Returns true if the region exists.
1488    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1489        let regions = self.regions.read().unwrap();
1490        regions.contains(&region_id)
1491    }
1492
1493    /// Inserts a new region into the set.
1494    pub(crate) fn insert_region(&self, region_id: RegionId) {
1495        let mut regions = self.regions.write().unwrap();
1496        regions.insert(region_id);
1497    }
1498
1499    /// Remove region by id.
1500    pub(crate) fn remove_region(&self, region_id: RegionId) {
1501        let mut regions = self.regions.write().unwrap();
1502        regions.remove(&region_id);
1503    }
1504}
1505
1506pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1507
1508/// Manifest stats.
1509#[derive(Default, Debug, Clone)]
1510pub struct ManifestStats {
1511    pub(crate) total_manifest_size: Arc<AtomicU64>,
1512    pub(crate) manifest_version: Arc<AtomicU64>,
1513    pub(crate) file_removed_cnt: Arc<AtomicU64>,
1514}
1515
1516impl ManifestStats {
1517    fn total_manifest_size(&self) -> u64 {
1518        self.total_manifest_size.load(Ordering::Relaxed)
1519    }
1520
1521    fn manifest_version(&self) -> u64 {
1522        self.manifest_version.load(Ordering::Relaxed)
1523    }
1524
1525    fn file_removed_cnt(&self) -> u64 {
1526        self.file_removed_cnt.load(Ordering::Relaxed)
1527    }
1528}
1529
1530/// Parses the partition expression from a JSON string.
1531pub fn parse_partition_expr(partition_expr_str: Option<&str>) -> Result<Option<PartitionExpr>> {
1532    match partition_expr_str {
1533        None => Ok(None),
1534        Some("") => Ok(None),
1535        Some(json_str) => {
1536            let expr = partition::expr::PartitionExpr::from_json_str(json_str)
1537                .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?;
1538            Ok(expr)
1539        }
1540    }
1541}
1542
1543#[cfg(test)]
1544mod tests {
1545    use std::sync::Arc;
1546    use std::sync::atomic::AtomicU64;
1547
1548    use common_datasource::compression::CompressionType;
1549    use common_test_util::temp_dir::create_temp_dir;
1550    use crossbeam_utils::atomic::AtomicCell;
1551    use object_store::ObjectStore;
1552    use object_store::services::Fs;
1553    use store_api::logstore::provider::Provider;
1554    use store_api::region_engine::RegionRole;
1555    use store_api::region_request::PathType;
1556    use store_api::storage::RegionId;
1557
1558    use crate::access_layer::AccessLayer;
1559    use crate::error::Error;
1560    use crate::manifest::action::{
1561        RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionPartitionExprChange,
1562    };
1563    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1564    use crate::region::{
1565        ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1566    };
1567    use crate::sst::FormatType;
1568    use crate::sst::index::intermediate::IntermediateManager;
1569    use crate::sst::index::puffin_manager::PuffinManagerFactory;
1570    use crate::test_util::scheduler_util::SchedulerEnv;
1571    use crate::test_util::version_util::VersionControlBuilder;
1572    use crate::time_provider::StdTimeProvider;
1573
1574    #[test]
1575    fn test_region_state_lock_free() {
1576        assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1577    }
1578
1579    async fn build_test_region(env: &SchedulerEnv) -> MitoRegion {
1580        let builder = VersionControlBuilder::new();
1581        let version_control = Arc::new(builder.build());
1582        let metadata = version_control.current().version.metadata.clone();
1583
1584        let manager = RegionManifestManager::new(
1585            metadata.clone(),
1586            0,
1587            RegionManifestOptions {
1588                manifest_dir: "".to_string(),
1589                object_store: env.access_layer.object_store().clone(),
1590                compress_type: CompressionType::Uncompressed,
1591                checkpoint_distance: 10,
1592                remove_file_options: Default::default(),
1593                manifest_cache: None,
1594            },
1595            FormatType::PrimaryKey,
1596            &Default::default(),
1597        )
1598        .await
1599        .unwrap();
1600
1601        let manifest_ctx = Arc::new(ManifestContext::new(
1602            manager,
1603            RegionRoleState::Leader(RegionLeaderState::Writable),
1604        ));
1605
1606        MitoRegion {
1607            region_id: metadata.region_id,
1608            version_control,
1609            access_layer: env.access_layer.clone(),
1610            manifest_ctx,
1611            file_purger: crate::test_util::new_noop_file_purger(),
1612            provider: Provider::noop_provider(),
1613            last_flush_millis: Default::default(),
1614            last_compaction_millis: Default::default(),
1615            time_provider: Arc::new(StdTimeProvider),
1616            topic_latest_entry_id: Default::default(),
1617            written_bytes: Arc::new(AtomicU64::new(0)),
1618            stats: ManifestStats::default(),
1619        }
1620    }
1621
1622    fn empty_edit() -> RegionEdit {
1623        RegionEdit {
1624            files_to_add: Vec::new(),
1625            files_to_remove: Vec::new(),
1626            timestamp_ms: None,
1627            compaction_time_window: None,
1628            flushed_entry_id: None,
1629            flushed_sequence: None,
1630            committed_sequence: None,
1631        }
1632    }
1633
1634    #[tokio::test]
1635    async fn test_exit_staging_partition_expr_change_and_edit_success() {
1636        let env = SchedulerEnv::new().await;
1637        let region = build_test_region(&env).await;
1638
1639        let mut manager = region.manifest_ctx.manifest_manager.write().await;
1640        region.set_staging(&mut manager).await.unwrap();
1641        manager
1642            .update(
1643                RegionMetaActionList::new(vec![
1644                    RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
1645                        partition_expr: Some("expr_a".to_string()),
1646                    }),
1647                    RegionMetaAction::Edit(empty_edit()),
1648                ]),
1649                true,
1650            )
1651            .await
1652            .unwrap();
1653
1654        region.exit_staging_on_success(&mut manager).await.unwrap();
1655        drop(manager);
1656
1657        assert_eq!(
1658            region.version().metadata.partition_expr.as_deref(),
1659            Some("expr_a")
1660        );
1661        assert_eq!(
1662            region.state(),
1663            RegionRoleState::Leader(RegionLeaderState::Writable)
1664        );
1665    }
1666
1667    #[tokio::test]
1668    async fn test_exit_staging_change_with_same_columns_success() {
1669        let env = SchedulerEnv::new().await;
1670        let region = build_test_region(&env).await;
1671
1672        let mut manager = region.manifest_ctx.manifest_manager.write().await;
1673        region.set_staging(&mut manager).await.unwrap();
1674
1675        let mut changed_metadata = region.version().metadata.as_ref().clone();
1676        changed_metadata.set_partition_expr(Some("expr_b".to_string()));
1677
1678        manager
1679            .update(
1680                RegionMetaActionList::new(vec![
1681                    RegionMetaAction::Change(RegionChange {
1682                        metadata: Arc::new(changed_metadata),
1683                        sst_format: FormatType::PrimaryKey,
1684                        append_mode: None,
1685                    }),
1686                    RegionMetaAction::Edit(empty_edit()),
1687                ]),
1688                true,
1689            )
1690            .await
1691            .unwrap();
1692
1693        region.exit_staging_on_success(&mut manager).await.unwrap();
1694        drop(manager);
1695
1696        assert_eq!(
1697            region.version().metadata.partition_expr.as_deref(),
1698            Some("expr_b")
1699        );
1700        assert_eq!(
1701            region.state(),
1702            RegionRoleState::Leader(RegionLeaderState::Writable)
1703        );
1704    }
1705
1706    #[tokio::test]
1707    async fn test_exit_staging_change_with_different_columns_fails() {
1708        let env = SchedulerEnv::new().await;
1709        let region = build_test_region(&env).await;
1710
1711        let mut manager = region.manifest_ctx.manifest_manager.write().await;
1712        region.set_staging(&mut manager).await.unwrap();
1713
1714        let mut changed_metadata = region.version().metadata.as_ref().clone();
1715        changed_metadata.column_metadatas.rotate_left(1);
1716
1717        manager
1718            .update(
1719                RegionMetaActionList::new(vec![
1720                    RegionMetaAction::Change(RegionChange {
1721                        metadata: Arc::new(changed_metadata),
1722                        sst_format: FormatType::PrimaryKey,
1723                        append_mode: None,
1724                    }),
1725                    RegionMetaAction::Edit(empty_edit()),
1726                ]),
1727                true,
1728            )
1729            .await
1730            .unwrap();
1731
1732        let result = region.exit_staging_on_success(&mut manager).await;
1733        assert!(matches!(result, Err(Error::Unexpected { .. })));
1734    }
1735
1736    #[tokio::test]
1737    async fn test_exit_staging_partition_expr_change_and_change_conflict_fails() {
1738        let env = SchedulerEnv::new().await;
1739        let region = build_test_region(&env).await;
1740
1741        let mut manager = region.manifest_ctx.manifest_manager.write().await;
1742        region.set_staging(&mut manager).await.unwrap();
1743
1744        let mut changed_metadata = region.version().metadata.as_ref().clone();
1745        changed_metadata.set_partition_expr(Some("expr_c".to_string()));
1746
1747        manager
1748            .update(
1749                RegionMetaActionList::new(vec![
1750                    RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
1751                        partition_expr: Some("expr_c".to_string()),
1752                    }),
1753                    RegionMetaAction::Change(RegionChange {
1754                        metadata: Arc::new(changed_metadata),
1755                        sst_format: FormatType::PrimaryKey,
1756                        append_mode: None,
1757                    }),
1758                    RegionMetaAction::Edit(empty_edit()),
1759                ]),
1760                true,
1761            )
1762            .await
1763            .unwrap();
1764
1765        let result = region.exit_staging_on_success(&mut manager).await;
1766        assert!(matches!(result, Err(Error::Unexpected { .. })));
1767    }
1768
1769    #[tokio::test]
1770    async fn test_set_region_state() {
1771        let env = SchedulerEnv::new().await;
1772        let builder = VersionControlBuilder::new();
1773        let version_control = Arc::new(builder.build());
1774        let manifest_ctx = env
1775            .mock_manifest_context(version_control.current().version.metadata.clone())
1776            .await;
1777
1778        let region_id = RegionId::new(1024, 0);
1779        // Leader -> Follower
1780        manifest_ctx.set_role(RegionRole::Follower, region_id);
1781        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1782
1783        // Follower -> Leader
1784        manifest_ctx.set_role(RegionRole::Leader, region_id);
1785        assert_eq!(
1786            manifest_ctx.state.load(),
1787            RegionRoleState::Leader(RegionLeaderState::Writable)
1788        );
1789
1790        // Direct Leader -> StagingLeader should be ignored.
1791        manifest_ctx.set_role(RegionRole::StagingLeader, region_id);
1792        assert_eq!(
1793            manifest_ctx.state.load(),
1794            RegionRoleState::Leader(RegionLeaderState::Writable)
1795        );
1796
1797        // Leader -> Downgrading Leader
1798        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1799        assert_eq!(
1800            manifest_ctx.state.load(),
1801            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1802        );
1803
1804        // Downgrading Leader -> Follower
1805        manifest_ctx.set_role(RegionRole::Follower, region_id);
1806        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1807
1808        // Can't downgrade from follower (Follower -> Downgrading Leader)
1809        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1810        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1811
1812        // Set region role too Downgrading Leader
1813        manifest_ctx.set_role(RegionRole::Leader, region_id);
1814        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1815        assert_eq!(
1816            manifest_ctx.state.load(),
1817            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1818        );
1819
1820        // Downgrading Leader -> Leader
1821        manifest_ctx.set_role(RegionRole::Leader, region_id);
1822        assert_eq!(
1823            manifest_ctx.state.load(),
1824            RegionRoleState::Leader(RegionLeaderState::Writable)
1825        );
1826    }
1827
1828    #[tokio::test]
1829    async fn test_staging_state_validation() {
1830        let env = SchedulerEnv::new().await;
1831        let builder = VersionControlBuilder::new();
1832        let version_control = Arc::new(builder.build());
1833
1834        // Create context with staging state using the correct pattern from SchedulerEnv
1835        let staging_ctx = {
1836            let manager = RegionManifestManager::new(
1837                version_control.current().version.metadata.clone(),
1838                0,
1839                RegionManifestOptions {
1840                    manifest_dir: "".to_string(),
1841                    object_store: env.access_layer.object_store().clone(),
1842                    compress_type: CompressionType::Uncompressed,
1843                    checkpoint_distance: 10,
1844                    remove_file_options: Default::default(),
1845                    manifest_cache: None,
1846                },
1847                FormatType::PrimaryKey,
1848                &Default::default(),
1849            )
1850            .await
1851            .unwrap();
1852            Arc::new(ManifestContext::new(
1853                manager,
1854                RegionRoleState::Leader(RegionLeaderState::Staging),
1855            ))
1856        };
1857
1858        // Test staging state behavior
1859        assert_eq!(
1860            staging_ctx.current_state(),
1861            RegionRoleState::Leader(RegionLeaderState::Staging)
1862        );
1863
1864        // Test writable context for comparison
1865        let writable_ctx = env
1866            .mock_manifest_context(version_control.current().version.metadata.clone())
1867            .await;
1868
1869        assert_eq!(
1870            writable_ctx.current_state(),
1871            RegionRoleState::Leader(RegionLeaderState::Writable)
1872        );
1873    }
1874
1875    #[tokio::test]
1876    async fn test_staging_state_transitions() {
1877        let builder = VersionControlBuilder::new();
1878        let version_control = Arc::new(builder.build());
1879        let metadata = version_control.current().version.metadata.clone();
1880
1881        // Create MitoRegion for testing state transitions
1882        let temp_dir = create_temp_dir("");
1883        let path_str = temp_dir.path().display().to_string();
1884        let fs_builder = Fs::default().root(&path_str);
1885        let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1886
1887        let index_aux_path = temp_dir.path().join("index_aux");
1888        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1889            .await
1890            .unwrap();
1891        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1892            .await
1893            .unwrap();
1894
1895        let access_layer = Arc::new(AccessLayer::new(
1896            "",
1897            PathType::Bare,
1898            object_store,
1899            puffin_mgr,
1900            intm_mgr,
1901        ));
1902
1903        let manager = RegionManifestManager::new(
1904            metadata.clone(),
1905            0,
1906            RegionManifestOptions {
1907                manifest_dir: "".to_string(),
1908                object_store: access_layer.object_store().clone(),
1909                compress_type: CompressionType::Uncompressed,
1910                checkpoint_distance: 10,
1911                remove_file_options: Default::default(),
1912                manifest_cache: None,
1913            },
1914            FormatType::PrimaryKey,
1915            &Default::default(),
1916        )
1917        .await
1918        .unwrap();
1919
1920        let manifest_ctx = Arc::new(ManifestContext::new(
1921            manager,
1922            RegionRoleState::Leader(RegionLeaderState::Writable),
1923        ));
1924
1925        let region = MitoRegion {
1926            region_id: metadata.region_id,
1927            version_control,
1928            access_layer,
1929            manifest_ctx: manifest_ctx.clone(),
1930            file_purger: crate::test_util::new_noop_file_purger(),
1931            provider: Provider::noop_provider(),
1932            last_flush_millis: Default::default(),
1933            last_compaction_millis: Default::default(),
1934            time_provider: Arc::new(StdTimeProvider),
1935            topic_latest_entry_id: Default::default(),
1936            written_bytes: Arc::new(AtomicU64::new(0)),
1937            stats: ManifestStats::default(),
1938        };
1939
1940        // Test initial state
1941        assert_eq!(
1942            region.state(),
1943            RegionRoleState::Leader(RegionLeaderState::Writable)
1944        );
1945        assert!(!region.is_staging());
1946
1947        // Test transition to staging
1948        let mut manager = manifest_ctx.manifest_manager.write().await;
1949        region.set_staging(&mut manager).await.unwrap();
1950        drop(manager);
1951        assert_eq!(
1952            region.state(),
1953            RegionRoleState::Leader(RegionLeaderState::Staging)
1954        );
1955        assert!(region.is_staging());
1956
1957        // Test transition back to writable
1958        region.exit_staging().unwrap();
1959        assert_eq!(
1960            region.state(),
1961            RegionRoleState::Leader(RegionLeaderState::Writable)
1962        );
1963        assert!(!region.is_staging());
1964
1965        // Test staging directory cleanup: Create dirty staging files before entering staging mode
1966        {
1967            // Create some dummy staging manifest files to simulate interrupted session
1968            let manager = manifest_ctx.manifest_manager.write().await;
1969            let dummy_actions = RegionMetaActionList::new(vec![]);
1970            let dummy_bytes = dummy_actions.encode().unwrap();
1971
1972            // Create dirty staging files with versions 100 and 101
1973            manager.store().save(100, &dummy_bytes, true).await.unwrap();
1974            manager.store().save(101, &dummy_bytes, true).await.unwrap();
1975            drop(manager);
1976
1977            // Verify dirty files exist before entering staging
1978            let manager = manifest_ctx.manifest_manager.read().await;
1979            let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1980            assert_eq!(
1981                dirty_manifests.len(),
1982                2,
1983                "Should have 2 dirty staging files"
1984            );
1985            drop(manager);
1986
1987            // Enter staging mode - this should clean up the dirty files
1988            let mut manager = manifest_ctx.manifest_manager.write().await;
1989            region.set_staging(&mut manager).await.unwrap();
1990            drop(manager);
1991
1992            // Verify dirty files are cleaned up after entering staging
1993            let manager = manifest_ctx.manifest_manager.read().await;
1994            let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1995            assert_eq!(
1996                cleaned_manifests.len(),
1997                0,
1998                "Dirty staging files should be cleaned up"
1999            );
2000            drop(manager);
2001
2002            // Exit staging to restore normal state for remaining tests
2003            region.exit_staging().unwrap();
2004        }
2005
2006        // Test invalid transitions
2007        let mut manager = manifest_ctx.manifest_manager.write().await;
2008        assert!(region.set_staging(&mut manager).await.is_ok()); // Writable -> Staging should work
2009        drop(manager);
2010        let mut manager = manifest_ctx.manifest_manager.write().await;
2011        assert!(region.set_staging(&mut manager).await.is_err()); // Staging -> Staging should fail
2012        drop(manager);
2013        assert!(region.exit_staging().is_ok()); // Staging -> Writable should work
2014        assert!(region.exit_staging().is_err()); // Writable -> Writable should fail
2015    }
2016}