mito2/
region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region.
16
17pub mod catchup;
18pub mod opener;
19pub mod options;
20pub mod utils;
21pub(crate) mod version;
22
23use std::collections::hash_map::Entry;
24use std::collections::{HashMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27
28use common_base::hash::partition_expr_version;
29use common_telemetry::{error, info, warn};
30use crossbeam_utils::atomic::AtomicCell;
31use partition::expr::PartitionExpr;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::PrimaryKeyEncoding;
35use store_api::logstore::provider::Provider;
36use store_api::metadata::RegionMetadataRef;
37use store_api::region_engine::{
38    RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
39};
40use store_api::region_request::{PathType, StagingPartitionDirective};
41use store_api::sst_entry::ManifestSstEntry;
42use store_api::storage::{FileId, RegionId, SequenceNumber};
43use tokio::sync::RwLockWriteGuard;
44pub use utils::*;
45
46use crate::access_layer::AccessLayerRef;
47use crate::error::{
48    InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
49    UnexpectedSnafu, UpdateManifestSnafu,
50};
51use crate::manifest::action::{
52    RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
53};
54use crate::manifest::manager::RegionManifestManager;
55use crate::region::version::{VersionControlRef, VersionRef};
56use crate::request::{OnFailure, OptionOutputTx};
57use crate::sst::file::FileMeta;
58use crate::sst::file_purger::FilePurgerRef;
59use crate::sst::location::{index_file_path, sst_file_path};
60use crate::time_provider::TimeProviderRef;
61
62/// This is the approximate factor to estimate the size of wal.
63const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
64
65/// Region status include region id, memtable usage, sst usage, wal usage and manifest usage.
66#[derive(Debug)]
67pub struct RegionUsage {
68    pub region_id: RegionId,
69    pub wal_usage: u64,
70    pub sst_usage: u64,
71    pub manifest_usage: u64,
72}
73
74impl RegionUsage {
75    pub fn disk_usage(&self) -> u64 {
76        self.wal_usage + self.sst_usage + self.manifest_usage
77    }
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum RegionLeaderState {
82    /// The region is opened and is writable.
83    Writable,
84    /// The region is in staging mode - writable but no checkpoint/compaction.
85    Staging,
86    /// The region is entering staging mode. - write requests will be stalled.
87    EnteringStaging,
88    /// The region is altering.
89    Altering,
90    /// The region is dropping.
91    Dropping,
92    /// The region is truncating.
93    Truncating,
94    /// The region is handling a region edit.
95    Editing,
96    /// The region is stepping down.
97    Downgrading,
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum RegionRoleState {
102    Leader(RegionLeaderState),
103    Follower,
104}
105
106impl RegionRoleState {
107    /// Converts the region role state to leader state if it is a leader state.
108    pub fn into_leader_state(self) -> Option<RegionLeaderState> {
109        match self {
110            RegionRoleState::Leader(leader_state) => Some(leader_state),
111            RegionRoleState::Follower => None,
112        }
113    }
114}
115
116/// Metadata and runtime status of a region.
117///
118/// Writing and reading a region follow a single-writer-multi-reader rule:
119/// - Only the region worker thread this region belongs to can modify the metadata.
120/// - Multiple reader threads are allowed to read a specific `version` of a region.
121#[derive(Debug)]
122pub struct MitoRegion {
123    /// Id of this region.
124    ///
125    /// Accessing region id from the version control is inconvenient so
126    /// we also store it here.
127    pub(crate) region_id: RegionId,
128
129    /// Version controller for this region.
130    ///
131    /// We MUST update the version control inside the write lock of the region manifest manager.
132    pub(crate) version_control: VersionControlRef,
133    /// SSTs accessor for this region.
134    pub(crate) access_layer: AccessLayerRef,
135    /// Context to maintain manifest for this region.
136    pub(crate) manifest_ctx: ManifestContextRef,
137    /// SST file purger.
138    pub(crate) file_purger: FilePurgerRef,
139    /// The provider of log store.
140    pub(crate) provider: Provider,
141    /// Last flush time in millis.
142    last_flush_millis: AtomicI64,
143    /// Last compaction time in millis.
144    last_compaction_millis: AtomicI64,
145    /// Provider to get current time.
146    time_provider: TimeProviderRef,
147    /// The topic's latest entry id since the region's last flushing.
148    /// **Only used for remote WAL pruning.**
149    ///
150    /// The value will be updated to the latest offset of the topic
151    /// if region receives a flush request or schedules a periodic flush task
152    /// and the region's memtable is empty.
153    ///
154    /// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region,
155    /// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`.
156    pub(crate) topic_latest_entry_id: AtomicU64,
157    /// The total bytes written to the region.
158    pub(crate) written_bytes: Arc<AtomicU64>,
159    /// Partition info of the region in staging mode.
160    ///
161    /// During the staging mode, the region metadata in [`VersionControlRef`] is not updated,
162    /// so we need to store the partition info separately.
163    pub(crate) staging_partition_info: Mutex<Option<StagingPartitionInfo>>,
164    /// manifest stats
165    stats: ManifestStats,
166}
167
168pub type MitoRegionRef = Arc<MitoRegion>;
169
170#[derive(Debug, Clone)]
171pub(crate) struct StagingPartitionInfo {
172    pub(crate) partition_directive: StagingPartitionDirective,
173    pub(crate) partition_rule_version: u64,
174}
175
176impl StagingPartitionInfo {
177    /// Returns the partition expression carried by the staging directive, if any.
178    pub(crate) fn partition_expr(&self) -> Option<&str> {
179        self.partition_directive.partition_expr()
180    }
181
182    /// Builds staging partition info from a directive and derives its version marker.
183    pub(crate) fn from_partition_directive(partition_directive: StagingPartitionDirective) -> Self {
184        let partition_rule_version = match &partition_directive {
185            StagingPartitionDirective::UpdatePartitionExpr(expr) => {
186                partition_expr_version(Some(expr))
187            }
188            StagingPartitionDirective::RejectAllWrites => 0,
189        };
190        Self {
191            partition_directive,
192            partition_rule_version,
193        }
194    }
195}
196
197impl MitoRegion {
198    /// Stop background managers for this region.
199    pub(crate) async fn stop(&self) {
200        self.manifest_ctx
201            .manifest_manager
202            .write()
203            .await
204            .stop()
205            .await;
206
207        info!(
208            "Stopped region manifest manager, region_id: {}",
209            self.region_id
210        );
211    }
212
213    /// Returns current metadata of the region.
214    pub fn metadata(&self) -> RegionMetadataRef {
215        let version_data = self.version_control.current();
216        version_data.version.metadata.clone()
217    }
218
219    /// Returns primary key encoding of the region.
220    pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
221        let version_data = self.version_control.current();
222        version_data.version.metadata.primary_key_encoding
223    }
224
225    /// Returns current version of the region.
226    pub(crate) fn version(&self) -> VersionRef {
227        let version_data = self.version_control.current();
228        version_data.version
229    }
230
231    /// Returns last flush timestamp in millis.
232    pub(crate) fn last_flush_millis(&self) -> i64 {
233        self.last_flush_millis.load(Ordering::Relaxed)
234    }
235
236    /// Update flush time to current time.
237    pub(crate) fn update_flush_millis(&self) {
238        let now = self.time_provider.current_time_millis();
239        self.last_flush_millis.store(now, Ordering::Relaxed);
240    }
241
242    /// Returns last compaction timestamp in millis.
243    pub(crate) fn last_compaction_millis(&self) -> i64 {
244        self.last_compaction_millis.load(Ordering::Relaxed)
245    }
246
247    /// Update compaction time to current time.
248    pub(crate) fn update_compaction_millis(&self) {
249        let now = self.time_provider.current_time_millis();
250        self.last_compaction_millis.store(now, Ordering::Relaxed);
251    }
252
253    /// Returns the table dir.
254    pub(crate) fn table_dir(&self) -> &str {
255        self.access_layer.table_dir()
256    }
257
258    /// Returns the path type of the region.
259    pub(crate) fn path_type(&self) -> PathType {
260        self.access_layer.path_type()
261    }
262
263    /// Returns whether the region is writable.
264    pub(crate) fn is_writable(&self) -> bool {
265        matches!(
266            self.manifest_ctx.state.load(),
267            RegionRoleState::Leader(RegionLeaderState::Writable)
268                | RegionRoleState::Leader(RegionLeaderState::Staging)
269        )
270    }
271
272    /// Returns whether the region is flushable.
273    pub(crate) fn is_flushable(&self) -> bool {
274        matches!(
275            self.manifest_ctx.state.load(),
276            RegionRoleState::Leader(RegionLeaderState::Writable)
277                | RegionRoleState::Leader(RegionLeaderState::Staging)
278                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
279        )
280    }
281
282    /// Returns whether the region should abort index building.
283    pub(crate) fn should_abort_index(&self) -> bool {
284        matches!(
285            self.manifest_ctx.state.load(),
286            RegionRoleState::Follower
287                | RegionRoleState::Leader(RegionLeaderState::Dropping)
288                | RegionRoleState::Leader(RegionLeaderState::Truncating)
289                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
290                | RegionRoleState::Leader(RegionLeaderState::Staging)
291        )
292    }
293
294    /// Returns whether the region is downgrading.
295    pub(crate) fn is_downgrading(&self) -> bool {
296        matches!(
297            self.manifest_ctx.state.load(),
298            RegionRoleState::Leader(RegionLeaderState::Downgrading)
299        )
300    }
301
302    /// Returns whether the region is in staging mode.
303    pub(crate) fn is_staging(&self) -> bool {
304        self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
305    }
306
307    /// Returns whether the region is entering staging mode.
308    pub(crate) fn is_enter_staging(&self) -> bool {
309        self.manifest_ctx.state.load()
310            == RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
311    }
312
313    pub fn region_id(&self) -> RegionId {
314        self.region_id
315    }
316
317    pub fn find_committed_sequence(&self) -> SequenceNumber {
318        self.version_control.committed_sequence()
319    }
320
321    /// Returns whether the region is readonly.
322    pub fn is_follower(&self) -> bool {
323        self.manifest_ctx.state.load() == RegionRoleState::Follower
324    }
325
326    /// Returns the state of the region.
327    pub(crate) fn state(&self) -> RegionRoleState {
328        self.manifest_ctx.state.load()
329    }
330
331    /// Sets the region role state.
332    pub(crate) fn set_role(&self, next_role: RegionRole) {
333        self.manifest_ctx.set_role(next_role, self.region_id);
334    }
335
336    /// Sets the altering state.
337    /// You should call this method in the worker loop.
338    pub(crate) fn set_altering(&self) -> Result<()> {
339        self.compare_exchange_state(
340            RegionLeaderState::Writable,
341            RegionRoleState::Leader(RegionLeaderState::Altering),
342        )
343    }
344
345    /// Sets the dropping state.
346    /// You should call this method in the worker loop.
347    pub(crate) fn set_dropping(&self, expect: RegionLeaderState) -> Result<()> {
348        self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Dropping))
349    }
350
351    /// Sets the truncating state.
352    /// You should call this method in the worker loop.
353    pub(crate) fn set_truncating(&self) -> Result<()> {
354        self.compare_exchange_state(
355            RegionLeaderState::Writable,
356            RegionRoleState::Leader(RegionLeaderState::Truncating),
357        )
358    }
359
360    /// Sets the editing state.
361    /// You should call this method in the worker loop.
362    pub(crate) fn set_editing(&self, expect: RegionLeaderState) -> Result<()> {
363        self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Editing))
364    }
365
366    /// Sets the staging state.
367    ///
368    /// You should call this method in the worker loop.
369    /// Transitions from Writable to Staging state.
370    /// Cleans any existing staging manifests before entering staging mode.
371    pub(crate) async fn set_staging(
372        &self,
373        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
374    ) -> Result<()> {
375        manager.store().clear_staging_manifests().await?;
376
377        self.compare_exchange_state(
378            RegionLeaderState::Writable,
379            RegionRoleState::Leader(RegionLeaderState::Staging),
380        )
381    }
382
383    /// Sets the entering staging state.
384    pub(crate) fn set_entering_staging(&self) -> Result<()> {
385        self.compare_exchange_state(
386            RegionLeaderState::Writable,
387            RegionRoleState::Leader(RegionLeaderState::EnteringStaging),
388        )
389    }
390
391    /// Exits the staging state back to writable.
392    ///
393    /// You should call this method in the worker loop.
394    /// Transitions from Staging to Writable state.
395    pub fn exit_staging(&self) -> Result<()> {
396        *self.staging_partition_info.lock().unwrap() = None;
397        self.compare_exchange_state(
398            RegionLeaderState::Staging,
399            RegionRoleState::Leader(RegionLeaderState::Writable),
400        )
401    }
402
403    /// Sets the region role state gracefully. This acquires the manifest write lock.
404    pub(crate) async fn set_role_state_gracefully(
405        &self,
406        state: SettableRegionRoleState,
407    ) -> Result<()> {
408        let mut manager: RwLockWriteGuard<'_, RegionManifestManager> =
409            self.manifest_ctx.manifest_manager.write().await;
410        let current_state = self.state();
411
412        match state {
413            SettableRegionRoleState::Leader => {
414                // Exit staging mode and return to normal writable leader
415                // Only allowed from staging state
416                match current_state {
417                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
418                        info!("Exiting staging mode for region {}", self.region_id);
419                        // Use the success exit path that merges all staged manifests
420                        self.exit_staging_on_success(&mut manager).await?;
421                    }
422                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
423                        // Already in desired state - no-op
424                        info!("Region {} already in normal leader mode", self.region_id);
425                    }
426                    _ => {
427                        // Only staging -> leader transition is allowed
428                        return Err(RegionStateSnafu {
429                            region_id: self.region_id,
430                            state: current_state,
431                            expect: RegionRoleState::Leader(RegionLeaderState::Staging),
432                        }
433                        .build());
434                    }
435                }
436            }
437
438            SettableRegionRoleState::StagingLeader => {
439                // Enter staging mode from normal writable leader
440                // Only allowed from writable leader state
441                match current_state {
442                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
443                        info!("Entering staging mode for region {}", self.region_id);
444                        self.set_staging(&mut manager).await?;
445                    }
446                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
447                        // Already in desired state - no-op
448                        info!("Region {} already in staging mode", self.region_id);
449                    }
450                    _ => {
451                        return Err(RegionStateSnafu {
452                            region_id: self.region_id,
453                            state: current_state,
454                            expect: RegionRoleState::Leader(RegionLeaderState::Writable),
455                        }
456                        .build());
457                    }
458                }
459            }
460
461            SettableRegionRoleState::Follower => {
462                // Make this region a follower
463                match current_state {
464                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
465                        info!(
466                            "Exiting staging and demoting region {} to follower",
467                            self.region_id
468                        );
469                        self.exit_staging()?;
470                        self.set_role(RegionRole::Follower);
471                    }
472                    RegionRoleState::Leader(_) => {
473                        info!("Demoting region {} from leader to follower", self.region_id);
474                        self.set_role(RegionRole::Follower);
475                    }
476                    RegionRoleState::Follower => {
477                        // Already in desired state - no-op
478                        info!("Region {} already in follower mode", self.region_id);
479                    }
480                }
481            }
482
483            SettableRegionRoleState::DowngradingLeader => {
484                // downgrade this region to downgrading leader
485                match current_state {
486                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
487                        info!(
488                            "Exiting staging and entering downgrade for region {}",
489                            self.region_id
490                        );
491                        self.exit_staging()?;
492                        self.set_role(RegionRole::DowngradingLeader);
493                    }
494                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
495                        info!("Starting downgrade for region {}", self.region_id);
496                        self.set_role(RegionRole::DowngradingLeader);
497                    }
498                    RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
499                        // Already in desired state - no-op
500                        info!("Region {} already in downgrading mode", self.region_id);
501                    }
502                    _ => {
503                        warn!(
504                            "Cannot start downgrade for region {} from state {:?}",
505                            self.region_id, current_state
506                        );
507                    }
508                }
509            }
510        }
511
512        // Hack(zhongzc): If we have just become leader (writable), persist any backfilled metadata.
513        if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
514            // Persist backfilled metadata if manifest is missing fields (e.g., partition_expr)
515            let manifest_meta = &manager.manifest().metadata;
516            let current_version = self.version();
517            let current_meta = &current_version.metadata;
518            if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
519                let action = RegionMetaAction::Change(RegionChange {
520                    metadata: current_meta.clone(),
521                    sst_format: current_version.options.sst_format.unwrap_or_default(),
522                    append_mode: None,
523                });
524                let result = manager
525                    .update(RegionMetaActionList::with_action(action), false)
526                    .await;
527
528                match result {
529                    Ok(version) => {
530                        info!(
531                            "Successfully persisted backfilled metadata for region {}, version: {}",
532                            self.region_id, version
533                        );
534                    }
535                    Err(e) => {
536                        warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
537                    }
538                }
539            }
540        }
541
542        drop(manager);
543
544        Ok(())
545    }
546
547    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Writable)` if the current state is `expect`.
548    /// Otherwise, logs an error.
549    pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
550        if let Err(e) = self
551            .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
552        {
553            error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
554        }
555    }
556
557    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Staging)` if the current state is `expect`.
558    /// Otherwise, logs an error.
559    pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
560        if let Err(e) =
561            self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
562        {
563            error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
564        }
565    }
566
567    /// Returns the region statistic.
568    pub(crate) fn region_statistic(&self) -> RegionStatistic {
569        let version = self.version();
570        let memtables = &version.memtables;
571        let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
572
573        let sst_usage = version.ssts.sst_usage();
574        let index_usage = version.ssts.index_usage();
575        let flushed_entry_id = version.flushed_entry_id;
576
577        let wal_usage = self.estimated_wal_usage(memtable_usage);
578        let manifest_usage = self.stats.total_manifest_size();
579        let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
580        let num_files = version.ssts.num_files();
581        let manifest_version = self.stats.manifest_version();
582        let file_removed_cnt = self.stats.file_removed_cnt();
583
584        let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
585        let written_bytes = self.written_bytes.load(Ordering::Relaxed);
586
587        RegionStatistic {
588            num_rows,
589            memtable_size: memtable_usage,
590            wal_size: wal_usage,
591            manifest_size: manifest_usage,
592            sst_size: sst_usage,
593            sst_num: num_files,
594            index_size: index_usage,
595            manifest: RegionManifestInfo::Mito {
596                manifest_version,
597                flushed_entry_id,
598                file_removed_cnt,
599            },
600            data_topic_latest_entry_id: topic_latest_entry_id,
601            metadata_topic_latest_entry_id: topic_latest_entry_id,
602            written_bytes,
603        }
604    }
605
606    /// Estimated WAL size in bytes.
607    /// Use the memtables size to estimate the size of wal.
608    fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
609        ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
610    }
611
612    /// Sets the state of the region to given state if the current state equals to
613    /// the expected.
614    fn compare_exchange_state(
615        &self,
616        expect: RegionLeaderState,
617        state: RegionRoleState,
618    ) -> Result<()> {
619        self.manifest_ctx
620            .state
621            .compare_exchange(RegionRoleState::Leader(expect), state)
622            .map_err(|actual| {
623                RegionStateSnafu {
624                    region_id: self.region_id,
625                    state: actual,
626                    expect: RegionRoleState::Leader(expect),
627                }
628                .build()
629            })?;
630        Ok(())
631    }
632
633    pub fn access_layer(&self) -> AccessLayerRef {
634        self.access_layer.clone()
635    }
636
637    /// Returns the SST entries of the region.
638    pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
639        let table_dir = self.table_dir();
640        let path_type = self.access_layer.path_type();
641
642        let visible_ssts = self
643            .version()
644            .ssts
645            .levels()
646            .iter()
647            .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
648            .collect::<HashSet<_>>();
649
650        let manifest_files = self.manifest_ctx.manifest().await.files.clone();
651        let staging_files = self
652            .manifest_ctx
653            .staging_manifest()
654            .await
655            .map(|m| m.files.clone())
656            .unwrap_or_default();
657        let files = manifest_files
658            .into_iter()
659            .chain(staging_files.into_iter())
660            .collect::<HashMap<_, _>>();
661
662        files
663            .values()
664            .map(|meta| {
665                let region_id = self.region_id;
666                let origin_region_id = meta.region_id;
667                let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
668                {
669                    let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
670                    (
671                        meta.index_version,
672                        Some(index_file_path),
673                        Some(meta.index_file_size),
674                    )
675                } else {
676                    (0, None, None)
677                };
678                let visible = visible_ssts.contains(&meta.file_id);
679                ManifestSstEntry {
680                    table_dir: table_dir.to_string(),
681                    region_id,
682                    table_id: region_id.table_id(),
683                    region_number: region_id.region_number(),
684                    region_group: region_id.region_group(),
685                    region_sequence: region_id.region_sequence(),
686                    file_id: meta.file_id.to_string(),
687                    index_version,
688                    level: meta.level,
689                    file_path: sst_file_path(table_dir, meta.file_id(), path_type),
690                    file_size: meta.file_size,
691                    index_file_path,
692                    index_file_size,
693                    num_rows: meta.num_rows,
694                    num_row_groups: meta.num_row_groups,
695                    num_series: Some(meta.num_series),
696                    min_ts: meta.time_range.0,
697                    max_ts: meta.time_range.1,
698                    sequence: meta.sequence.map(|s| s.get()),
699                    origin_region_id,
700                    node_id: None,
701                    visible,
702                }
703            })
704            .collect()
705    }
706
707    /// Returns the file metas of the region by file ids.
708    pub async fn file_metas(&self, file_ids: &[FileId]) -> Vec<Option<FileMeta>> {
709        let manifest_files = self.manifest_ctx.manifest().await.files.clone();
710
711        file_ids
712            .iter()
713            .map(|file_id| manifest_files.get(file_id).cloned())
714            .collect::<Vec<_>>()
715    }
716
717    /// Exit staging mode successfully by merging all staged manifests and making them visible.
718    pub(crate) async fn exit_staging_on_success(
719        &self,
720        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
721    ) -> Result<()> {
722        let current_state = self.manifest_ctx.current_state();
723        ensure!(
724            current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
725            RegionStateSnafu {
726                region_id: self.region_id,
727                state: current_state,
728                expect: RegionRoleState::Leader(RegionLeaderState::Staging),
729            }
730        );
731
732        // Merge all staged manifest actions
733        let merged_actions = match manager.merge_staged_actions(current_state).await? {
734            Some(actions) => actions,
735            None => {
736                info!(
737                    "No staged manifests to merge for region {}, exiting staging mode without changes",
738                    self.region_id
739                );
740                // Even if no manifests to merge, we still need to exit staging mode
741                self.exit_staging()?;
742                return Ok(());
743            }
744        };
745        let expect_change = merged_actions.actions.iter().any(|a| a.is_change());
746        let expect_partition_expr_change = merged_actions
747            .actions
748            .iter()
749            .any(|a| a.is_partition_expr_change());
750        let expect_edit = merged_actions.actions.iter().any(|a| a.is_edit());
751        ensure!(
752            !(expect_change && expect_partition_expr_change),
753            UnexpectedSnafu {
754                reason: "unexpected both change and partition expr change actions in merged actions"
755            }
756        );
757        ensure!(
758            expect_change || expect_partition_expr_change,
759            UnexpectedSnafu {
760                reason: "expect a change or partition expr change action in merged actions"
761            }
762        );
763        ensure!(
764            expect_edit,
765            UnexpectedSnafu {
766                reason: "expect an edit action in merged actions"
767            }
768        );
769
770        let (merged_partition_expr_change, merged_change, merged_edit) =
771            merged_actions.clone().split_region_change_and_edit();
772        if let Some(change) = &merged_change {
773            // In staging exit we only allow metadata-only updates. A `Change`
774            // action is accepted only when column definitions are unchanged;
775            // otherwise it is treated as a schema change and rejected.
776            let current_column_metadatas = &self.version().metadata.column_metadatas;
777            ensure!(
778                change.metadata.column_metadatas == *current_column_metadatas,
779                UnexpectedSnafu {
780                    reason: "change action alters column metadata in staging exit"
781                }
782            );
783        }
784
785        // Submit merged actions using the manifest manager's update method
786        // Pass the `false` so it saves to normal directory, not staging
787        let new_version = manager.update(merged_actions, false).await?;
788        info!(
789            "Successfully submitted merged staged manifests for region {}, new version: {}",
790            self.region_id, new_version
791        );
792
793        // Apply the merged changes to in-memory version control
794        if let Some(change) = merged_partition_expr_change {
795            let mut new_metadata = self.version().metadata.as_ref().clone();
796            new_metadata.set_partition_expr(change.partition_expr);
797            self.version_control.alter_metadata(new_metadata.into());
798        }
799        if let Some(change) = merged_change {
800            self.version_control.alter_metadata(change.metadata);
801        }
802        self.version_control
803            .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
804
805        // Clear all staging manifests and transit state
806        if let Err(e) = manager.clear_staging_manifest_and_dir().await {
807            error!(e; "Failed to clear staging manifest dir for region {}", self.region_id);
808        }
809        self.exit_staging()?;
810
811        Ok(())
812    }
813
814    /// Returns the partition expression string for this region.
815    ///
816    /// If the region is currently in staging state, this returns the partition expression held in
817    /// the staging partition field. Otherwise, it returns the partition expression from the primary
818    /// region metadata (current committed version).
819    pub fn maybe_staging_partition_expr_str(&self) -> Option<String> {
820        let is_staging = self.is_staging();
821        if is_staging {
822            let staging_partition_info = self.staging_partition_info.lock().unwrap();
823            if staging_partition_info.is_none() {
824                warn!(
825                    "Staging partition expr is none for region {} in staging state",
826                    self.region_id
827                );
828            }
829            staging_partition_info
830                .as_ref()
831                .and_then(|info| info.partition_expr().map(ToString::to_string))
832        } else {
833            let version = self.version();
834            version.metadata.partition_expr.clone()
835        }
836    }
837
838    pub fn expected_partition_expr_version(&self) -> u64 {
839        if self.is_staging() {
840            let staging_partition_info = self.staging_partition_info.lock().unwrap();
841            staging_partition_info
842                .as_ref()
843                .map(|info| info.partition_rule_version)
844                .unwrap_or_default()
845        } else {
846            self.version().metadata.partition_expr_version
847        }
848    }
849
850    /// Returns whether writes should be rejected for this region in staging mode.
851    pub(crate) fn reject_all_writes_in_staging(&self) -> bool {
852        if !self.is_staging() {
853            return false;
854        }
855        let staging_partition_info = self.staging_partition_info.lock().unwrap();
856        staging_partition_info
857            .as_ref()
858            .map(|info| {
859                matches!(
860                    info.partition_directive,
861                    StagingPartitionDirective::RejectAllWrites
862                )
863            })
864            .unwrap_or(false)
865    }
866}
867
868/// Context to update the region manifest.
869#[derive(Debug)]
870pub(crate) struct ManifestContext {
871    /// Manager to maintain manifest for this region.
872    pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
873    /// The state of the region. The region checks the state before updating
874    /// manifest.
875    state: AtomicCell<RegionRoleState>,
876}
877
878impl ManifestContext {
879    pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
880        ManifestContext {
881            manifest_manager: tokio::sync::RwLock::new(manager),
882            state: AtomicCell::new(state),
883        }
884    }
885
886    pub(crate) async fn manifest_version(&self) -> ManifestVersion {
887        self.manifest_manager
888            .read()
889            .await
890            .manifest()
891            .manifest_version
892    }
893
894    pub(crate) async fn has_update(&self) -> Result<bool> {
895        self.manifest_manager.read().await.has_update().await
896    }
897
898    /// Returns the current region role state.
899    pub(crate) fn current_state(&self) -> RegionRoleState {
900        self.state.load()
901    }
902
903    /// Installs the manifest changes from the current version to the target version (inclusive).
904    ///
905    /// Returns installed [RegionManifest].
906    /// **Note**: This function is not guaranteed to install the target version strictly.
907    /// The installed version may be greater than the target version.
908    pub(crate) async fn install_manifest_to(
909        &self,
910        version: ManifestVersion,
911    ) -> Result<Arc<RegionManifest>> {
912        let mut manager = self.manifest_manager.write().await;
913        manager.install_manifest_to(version).await?;
914
915        Ok(manager.manifest())
916    }
917
918    /// Updates the manifest if current state is `expect_state`.
919    pub(crate) async fn update_manifest(
920        &self,
921        expect_state: RegionLeaderState,
922        action_list: RegionMetaActionList,
923        is_staging: bool,
924    ) -> Result<ManifestVersion> {
925        // Acquires the write lock of the manifest manager.
926        let mut manager = self.manifest_manager.write().await;
927        // Gets current manifest.
928        let manifest = manager.manifest();
929        // Checks state inside the lock. This is to ensure that we won't update the manifest
930        // after `set_readonly_gracefully()` is called.
931        let current_state = self.state.load();
932
933        // If expect_state is not downgrading, the current state must be either `expect_state` or downgrading.
934        //
935        // A downgrading leader rejects user writes but still allows
936        // flushing the memtable and updating the manifest.
937        if expect_state != RegionLeaderState::Downgrading {
938            if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
939                info!(
940                    "Region {} is in downgrading leader state, updating manifest. state is {:?}",
941                    manifest.metadata.region_id, expect_state
942                );
943            }
944            ensure!(
945                current_state == RegionRoleState::Leader(expect_state)
946                    || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
947                UpdateManifestSnafu {
948                    region_id: manifest.metadata.region_id,
949                    state: current_state,
950                }
951            );
952        } else {
953            ensure!(
954                current_state == RegionRoleState::Leader(expect_state),
955                RegionStateSnafu {
956                    region_id: manifest.metadata.region_id,
957                    state: current_state,
958                    expect: RegionRoleState::Leader(expect_state),
959                }
960            );
961        }
962
963        for action in &action_list.actions {
964            // Checks whether the edit is still applicable.
965            let RegionMetaAction::Edit(edit) = &action else {
966                continue;
967            };
968
969            // Checks whether the region is truncated.
970            let Some(truncated_entry_id) = manifest.truncated_entry_id else {
971                continue;
972            };
973
974            // This is an edit from flush.
975            if let Some(flushed_entry_id) = edit.flushed_entry_id {
976                ensure!(
977                    truncated_entry_id < flushed_entry_id,
978                    RegionTruncatedSnafu {
979                        region_id: manifest.metadata.region_id,
980                    }
981                );
982            }
983
984            // This is an edit from compaction.
985            if !edit.files_to_remove.is_empty() {
986                // Input files of the compaction task has been truncated.
987                for file in &edit.files_to_remove {
988                    ensure!(
989                        manifest.files.contains_key(&file.file_id),
990                        RegionTruncatedSnafu {
991                            region_id: manifest.metadata.region_id,
992                        }
993                    );
994                }
995            }
996        }
997
998        // Now we can update the manifest.
999        let version = manager.update(action_list, is_staging).await.inspect_err(
1000            |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
1001        )?;
1002
1003        if self.state.load() == RegionRoleState::Follower {
1004            warn!(
1005                "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
1006                manifest.metadata.region_id
1007            );
1008        }
1009
1010        Ok(version)
1011    }
1012
1013    /// Sets the [`RegionRole`].
1014    ///
1015    /// ```text
1016    ///     +------------------------------------------+
1017    ///     |                      +-----------------+ |
1018    ///     |                      |                 | |
1019    /// +---+------+       +-------+-----+        +--v-v---+
1020    /// | Follower |       | Downgrading |        | Leader |
1021    /// +---^-^----+       +-----+-^-----+        +--+-+---+
1022    ///     | |                  | |                 | |
1023    ///     | +------------------+ +-----------------+ |
1024    ///     +------------------------------------------+
1025    ///
1026    /// Transition:
1027    /// - Follower -> Leader
1028    /// - Downgrading Leader -> Leader
1029    /// - Leader -> Follower
1030    /// - Downgrading Leader -> Follower
1031    /// - Leader -> Downgrading Leader
1032    ///
1033    /// ```
1034    pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
1035        match next_role {
1036            RegionRole::Follower => {
1037                match self.state.fetch_update(|state| {
1038                    if !matches!(state, RegionRoleState::Follower) {
1039                        Some(RegionRoleState::Follower)
1040                    } else {
1041                        None
1042                    }
1043                }) {
1044                    Ok(state) => info!(
1045                        "Convert region {} to follower, previous role state: {:?}",
1046                        region_id, state
1047                    ),
1048                    Err(state) => {
1049                        if state != RegionRoleState::Follower {
1050                            warn!(
1051                                "Failed to convert region {} to follower, current role state: {:?}",
1052                                region_id, state
1053                            )
1054                        }
1055                    }
1056                }
1057            }
1058            RegionRole::Leader => {
1059                match self.state.fetch_update(|state| {
1060                    if matches!(
1061                        state,
1062                        RegionRoleState::Follower
1063                            | RegionRoleState::Leader(RegionLeaderState::Downgrading)
1064                    ) {
1065                        Some(RegionRoleState::Leader(RegionLeaderState::Writable))
1066                    } else {
1067                        None
1068                    }
1069                }) {
1070                    Ok(state) => info!(
1071                        "Convert region {} to leader, previous role state: {:?}",
1072                        region_id, state
1073                    ),
1074                    Err(state) => {
1075                        if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
1076                            warn!(
1077                                "Failed to convert region {} to leader, current role state: {:?}",
1078                                region_id, state
1079                            )
1080                        }
1081                    }
1082                }
1083            }
1084            RegionRole::DowngradingLeader => {
1085                match self.state.compare_exchange(
1086                    RegionRoleState::Leader(RegionLeaderState::Writable),
1087                    RegionRoleState::Leader(RegionLeaderState::Downgrading),
1088                ) {
1089                    Ok(state) => info!(
1090                        "Convert region {} to downgrading region, previous role state: {:?}",
1091                        region_id, state
1092                    ),
1093                    Err(state) => {
1094                        if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
1095                            warn!(
1096                                "Failed to convert region {} to downgrading leader, current role state: {:?}",
1097                                region_id, state
1098                            )
1099                        }
1100                    }
1101                }
1102            }
1103        }
1104    }
1105
1106    /// Returns the normal manifest of the region.
1107    pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
1108        self.manifest_manager.read().await.manifest()
1109    }
1110
1111    /// Returns the staging manifest of the region.
1112    pub(crate) async fn staging_manifest(
1113        &self,
1114    ) -> Option<Arc<crate::manifest::action::RegionManifest>> {
1115        self.manifest_manager.read().await.staging_manifest()
1116    }
1117}
1118
1119pub(crate) type ManifestContextRef = Arc<ManifestContext>;
1120
1121/// Regions indexed by ids.
1122#[derive(Debug, Default)]
1123pub(crate) struct RegionMap {
1124    regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
1125}
1126
1127impl RegionMap {
1128    /// Returns true if the region exists.
1129    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1130        let regions = self.regions.read().unwrap();
1131        regions.contains_key(&region_id)
1132    }
1133
1134    /// Inserts a new region into the map.
1135    pub(crate) fn insert_region(&self, region: MitoRegionRef) {
1136        let mut regions = self.regions.write().unwrap();
1137        regions.insert(region.region_id, region);
1138    }
1139
1140    /// Gets region by region id.
1141    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1142        let regions = self.regions.read().unwrap();
1143        regions.get(&region_id).cloned()
1144    }
1145
1146    /// Gets writable region by region id.
1147    ///
1148    /// Returns error if the region does not exist or is readonly.
1149    pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1150        let region = self
1151            .get_region(region_id)
1152            .context(RegionNotFoundSnafu { region_id })?;
1153        ensure!(
1154            region.is_writable(),
1155            RegionStateSnafu {
1156                region_id,
1157                state: region.state(),
1158                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1159            }
1160        );
1161        Ok(region)
1162    }
1163
1164    /// Gets readonly region by region id.
1165    ///
1166    /// Returns error if the region does not exist or is writable.
1167    pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1168        let region = self
1169            .get_region(region_id)
1170            .context(RegionNotFoundSnafu { region_id })?;
1171        ensure!(
1172            region.is_follower(),
1173            RegionStateSnafu {
1174                region_id,
1175                state: region.state(),
1176                expect: RegionRoleState::Follower,
1177            }
1178        );
1179
1180        Ok(region)
1181    }
1182
1183    /// Gets region by region id.
1184    ///
1185    /// Calls the callback if the region does not exist.
1186    pub(crate) fn get_region_or<F: OnFailure>(
1187        &self,
1188        region_id: RegionId,
1189        cb: &mut F,
1190    ) -> Option<MitoRegionRef> {
1191        match self
1192            .get_region(region_id)
1193            .context(RegionNotFoundSnafu { region_id })
1194        {
1195            Ok(region) => Some(region),
1196            Err(e) => {
1197                cb.on_failure(e);
1198                None
1199            }
1200        }
1201    }
1202
1203    /// Gets writable region by region id.
1204    ///
1205    /// Calls the callback if the region does not exist or is readonly.
1206    pub(crate) fn writable_region_or<F: OnFailure>(
1207        &self,
1208        region_id: RegionId,
1209        cb: &mut F,
1210    ) -> Option<MitoRegionRef> {
1211        match self.writable_region(region_id) {
1212            Ok(region) => Some(region),
1213            Err(e) => {
1214                cb.on_failure(e);
1215                None
1216            }
1217        }
1218    }
1219
1220    /// Gets writable non-staging region by region id.
1221    ///
1222    /// Returns error if the region does not exist, is readonly, or is in staging mode.
1223    pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1224        let region = self.writable_region(region_id)?;
1225        if region.is_staging() {
1226            return Err(crate::error::RegionStateSnafu {
1227                region_id,
1228                state: region.state(),
1229                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1230            }
1231            .build());
1232        }
1233        Ok(region)
1234    }
1235
1236    /// Gets staging region by region id.
1237    ///
1238    /// Returns error if the region does not exist or is not in staging state.
1239    pub(crate) fn staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1240        let region = self
1241            .get_region(region_id)
1242            .context(RegionNotFoundSnafu { region_id })?;
1243        ensure!(
1244            region.is_staging(),
1245            RegionStateSnafu {
1246                region_id,
1247                state: region.state(),
1248                expect: RegionRoleState::Leader(RegionLeaderState::Staging),
1249            }
1250        );
1251        Ok(region)
1252    }
1253
1254    /// Gets flushable region by region id.
1255    ///
1256    /// Returns error if the region does not exist.
1257    /// Returns None if the region exists but not operatable.
1258    fn flushable_region(&self, region_id: RegionId) -> Result<Option<MitoRegionRef>> {
1259        let region = self
1260            .get_region(region_id)
1261            .context(RegionNotFoundSnafu { region_id })?;
1262        if region.is_flushable() {
1263            Ok(Some(region))
1264        } else {
1265            Ok(None)
1266        }
1267    }
1268
1269    /// Gets flushable region by region id.
1270    ///
1271    /// Calls the callback if the region does not exist.
1272    /// Returns None if the region exists but not operatable.
1273    pub(crate) fn flushable_region_or<F: OnFailure>(
1274        &self,
1275        region_id: RegionId,
1276        cb: &mut F,
1277    ) -> Option<MitoRegionRef> {
1278        match self.flushable_region(region_id) {
1279            Ok(region) => region,
1280            Err(e) => {
1281                cb.on_failure(e);
1282                None
1283            }
1284        }
1285    }
1286
1287    /// Remove region by id.
1288    pub(crate) fn remove_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1289        let mut regions = self.regions.write().unwrap();
1290        regions.remove(&region_id)
1291    }
1292
1293    /// List all regions.
1294    pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1295        let regions = self.regions.read().unwrap();
1296        regions.values().cloned().collect()
1297    }
1298
1299    /// Clear the map.
1300    pub(crate) fn clear(&self) {
1301        self.regions.write().unwrap().clear();
1302    }
1303}
1304
1305pub(crate) type RegionMapRef = Arc<RegionMap>;
1306
1307/// Opening regions
1308#[derive(Debug, Default)]
1309pub(crate) struct OpeningRegions {
1310    regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1311}
1312
1313impl OpeningRegions {
1314    /// Registers `sender` for an opening region; Otherwise, it returns `None`.
1315    pub(crate) fn wait_for_opening_region(
1316        &self,
1317        region_id: RegionId,
1318        sender: OptionOutputTx,
1319    ) -> Option<OptionOutputTx> {
1320        let mut regions = self.regions.write().unwrap();
1321        match regions.entry(region_id) {
1322            Entry::Occupied(mut senders) => {
1323                senders.get_mut().push(sender);
1324                None
1325            }
1326            Entry::Vacant(_) => Some(sender),
1327        }
1328    }
1329
1330    /// Returns true if the region exists.
1331    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1332        let regions = self.regions.read().unwrap();
1333        regions.contains_key(&region_id)
1334    }
1335
1336    /// Inserts a new region into the map.
1337    pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1338        let mut regions = self.regions.write().unwrap();
1339        regions.insert(region, vec![sender]);
1340    }
1341
1342    /// Remove region by id.
1343    pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1344        let mut regions = self.regions.write().unwrap();
1345        regions.remove(&region_id).unwrap_or_default()
1346    }
1347
1348    #[cfg(test)]
1349    pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1350        let regions = self.regions.read().unwrap();
1351        if let Some(senders) = regions.get(&region_id) {
1352            senders.len()
1353        } else {
1354            0
1355        }
1356    }
1357}
1358
1359pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1360
1361/// The regions that are catching up.
1362#[derive(Debug, Default)]
1363pub(crate) struct CatchupRegions {
1364    regions: RwLock<HashSet<RegionId>>,
1365}
1366
1367impl CatchupRegions {
1368    /// Returns true if the region exists.
1369    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1370        let regions = self.regions.read().unwrap();
1371        regions.contains(&region_id)
1372    }
1373
1374    /// Inserts a new region into the set.
1375    pub(crate) fn insert_region(&self, region_id: RegionId) {
1376        let mut regions = self.regions.write().unwrap();
1377        regions.insert(region_id);
1378    }
1379
1380    /// Remove region by id.
1381    pub(crate) fn remove_region(&self, region_id: RegionId) {
1382        let mut regions = self.regions.write().unwrap();
1383        regions.remove(&region_id);
1384    }
1385}
1386
1387pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1388
1389/// Manifest stats.
1390#[derive(Default, Debug, Clone)]
1391pub struct ManifestStats {
1392    pub(crate) total_manifest_size: Arc<AtomicU64>,
1393    pub(crate) manifest_version: Arc<AtomicU64>,
1394    pub(crate) file_removed_cnt: Arc<AtomicU64>,
1395}
1396
1397impl ManifestStats {
1398    fn total_manifest_size(&self) -> u64 {
1399        self.total_manifest_size.load(Ordering::Relaxed)
1400    }
1401
1402    fn manifest_version(&self) -> u64 {
1403        self.manifest_version.load(Ordering::Relaxed)
1404    }
1405
1406    fn file_removed_cnt(&self) -> u64 {
1407        self.file_removed_cnt.load(Ordering::Relaxed)
1408    }
1409}
1410
1411/// Parses the partition expression from a JSON string.
1412pub fn parse_partition_expr(partition_expr_str: Option<&str>) -> Result<Option<PartitionExpr>> {
1413    match partition_expr_str {
1414        None => Ok(None),
1415        Some("") => Ok(None),
1416        Some(json_str) => {
1417            let expr = partition::expr::PartitionExpr::from_json_str(json_str)
1418                .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?;
1419            Ok(expr)
1420        }
1421    }
1422}
1423
1424#[cfg(test)]
1425mod tests {
1426    use std::sync::atomic::AtomicU64;
1427    use std::sync::{Arc, Mutex};
1428
1429    use common_datasource::compression::CompressionType;
1430    use common_test_util::temp_dir::create_temp_dir;
1431    use crossbeam_utils::atomic::AtomicCell;
1432    use object_store::ObjectStore;
1433    use object_store::services::Fs;
1434    use store_api::logstore::provider::Provider;
1435    use store_api::region_engine::RegionRole;
1436    use store_api::region_request::PathType;
1437    use store_api::storage::RegionId;
1438
1439    use crate::access_layer::AccessLayer;
1440    use crate::error::Error;
1441    use crate::manifest::action::{
1442        RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionPartitionExprChange,
1443    };
1444    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1445    use crate::region::{
1446        ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1447    };
1448    use crate::sst::FormatType;
1449    use crate::sst::index::intermediate::IntermediateManager;
1450    use crate::sst::index::puffin_manager::PuffinManagerFactory;
1451    use crate::test_util::scheduler_util::SchedulerEnv;
1452    use crate::test_util::version_util::VersionControlBuilder;
1453    use crate::time_provider::StdTimeProvider;
1454
1455    #[test]
1456    fn test_region_state_lock_free() {
1457        assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1458    }
1459
1460    async fn build_test_region(env: &SchedulerEnv) -> MitoRegion {
1461        let builder = VersionControlBuilder::new();
1462        let version_control = Arc::new(builder.build());
1463        let metadata = version_control.current().version.metadata.clone();
1464
1465        let manager = RegionManifestManager::new(
1466            metadata.clone(),
1467            0,
1468            RegionManifestOptions {
1469                manifest_dir: "".to_string(),
1470                object_store: env.access_layer.object_store().clone(),
1471                compress_type: CompressionType::Uncompressed,
1472                checkpoint_distance: 10,
1473                remove_file_options: Default::default(),
1474                manifest_cache: None,
1475            },
1476            FormatType::PrimaryKey,
1477            &Default::default(),
1478        )
1479        .await
1480        .unwrap();
1481
1482        let manifest_ctx = Arc::new(ManifestContext::new(
1483            manager,
1484            RegionRoleState::Leader(RegionLeaderState::Writable),
1485        ));
1486
1487        MitoRegion {
1488            region_id: metadata.region_id,
1489            version_control,
1490            access_layer: env.access_layer.clone(),
1491            manifest_ctx,
1492            file_purger: crate::test_util::new_noop_file_purger(),
1493            provider: Provider::noop_provider(),
1494            last_flush_millis: Default::default(),
1495            last_compaction_millis: Default::default(),
1496            time_provider: Arc::new(StdTimeProvider),
1497            topic_latest_entry_id: Default::default(),
1498            written_bytes: Arc::new(AtomicU64::new(0)),
1499            stats: ManifestStats::default(),
1500            staging_partition_info: Mutex::new(None),
1501        }
1502    }
1503
1504    fn empty_edit() -> RegionEdit {
1505        RegionEdit {
1506            files_to_add: Vec::new(),
1507            files_to_remove: Vec::new(),
1508            timestamp_ms: None,
1509            compaction_time_window: None,
1510            flushed_entry_id: None,
1511            flushed_sequence: None,
1512            committed_sequence: None,
1513        }
1514    }
1515
1516    #[tokio::test]
1517    async fn test_exit_staging_partition_expr_change_and_edit_success() {
1518        let env = SchedulerEnv::new().await;
1519        let region = build_test_region(&env).await;
1520
1521        let mut manager = region.manifest_ctx.manifest_manager.write().await;
1522        region.set_staging(&mut manager).await.unwrap();
1523        manager
1524            .update(
1525                RegionMetaActionList::new(vec![
1526                    RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
1527                        partition_expr: Some("expr_a".to_string()),
1528                    }),
1529                    RegionMetaAction::Edit(empty_edit()),
1530                ]),
1531                true,
1532            )
1533            .await
1534            .unwrap();
1535
1536        region.exit_staging_on_success(&mut manager).await.unwrap();
1537        drop(manager);
1538
1539        assert_eq!(
1540            region.version().metadata.partition_expr.as_deref(),
1541            Some("expr_a")
1542        );
1543        assert_eq!(
1544            region.state(),
1545            RegionRoleState::Leader(RegionLeaderState::Writable)
1546        );
1547    }
1548
1549    #[tokio::test]
1550    async fn test_exit_staging_change_with_same_columns_success() {
1551        let env = SchedulerEnv::new().await;
1552        let region = build_test_region(&env).await;
1553
1554        let mut manager = region.manifest_ctx.manifest_manager.write().await;
1555        region.set_staging(&mut manager).await.unwrap();
1556
1557        let mut changed_metadata = region.version().metadata.as_ref().clone();
1558        changed_metadata.set_partition_expr(Some("expr_b".to_string()));
1559
1560        manager
1561            .update(
1562                RegionMetaActionList::new(vec![
1563                    RegionMetaAction::Change(RegionChange {
1564                        metadata: Arc::new(changed_metadata),
1565                        sst_format: FormatType::PrimaryKey,
1566                        append_mode: None,
1567                    }),
1568                    RegionMetaAction::Edit(empty_edit()),
1569                ]),
1570                true,
1571            )
1572            .await
1573            .unwrap();
1574
1575        region.exit_staging_on_success(&mut manager).await.unwrap();
1576        drop(manager);
1577
1578        assert_eq!(
1579            region.version().metadata.partition_expr.as_deref(),
1580            Some("expr_b")
1581        );
1582        assert_eq!(
1583            region.state(),
1584            RegionRoleState::Leader(RegionLeaderState::Writable)
1585        );
1586    }
1587
1588    #[tokio::test]
1589    async fn test_exit_staging_change_with_different_columns_fails() {
1590        let env = SchedulerEnv::new().await;
1591        let region = build_test_region(&env).await;
1592
1593        let mut manager = region.manifest_ctx.manifest_manager.write().await;
1594        region.set_staging(&mut manager).await.unwrap();
1595
1596        let mut changed_metadata = region.version().metadata.as_ref().clone();
1597        changed_metadata.column_metadatas.rotate_left(1);
1598
1599        manager
1600            .update(
1601                RegionMetaActionList::new(vec![
1602                    RegionMetaAction::Change(RegionChange {
1603                        metadata: Arc::new(changed_metadata),
1604                        sst_format: FormatType::PrimaryKey,
1605                        append_mode: None,
1606                    }),
1607                    RegionMetaAction::Edit(empty_edit()),
1608                ]),
1609                true,
1610            )
1611            .await
1612            .unwrap();
1613
1614        let result = region.exit_staging_on_success(&mut manager).await;
1615        assert!(matches!(result, Err(Error::Unexpected { .. })));
1616    }
1617
1618    #[tokio::test]
1619    async fn test_exit_staging_partition_expr_change_and_change_conflict_fails() {
1620        let env = SchedulerEnv::new().await;
1621        let region = build_test_region(&env).await;
1622
1623        let mut manager = region.manifest_ctx.manifest_manager.write().await;
1624        region.set_staging(&mut manager).await.unwrap();
1625
1626        let mut changed_metadata = region.version().metadata.as_ref().clone();
1627        changed_metadata.set_partition_expr(Some("expr_c".to_string()));
1628
1629        manager
1630            .update(
1631                RegionMetaActionList::new(vec![
1632                    RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
1633                        partition_expr: Some("expr_c".to_string()),
1634                    }),
1635                    RegionMetaAction::Change(RegionChange {
1636                        metadata: Arc::new(changed_metadata),
1637                        sst_format: FormatType::PrimaryKey,
1638                        append_mode: None,
1639                    }),
1640                    RegionMetaAction::Edit(empty_edit()),
1641                ]),
1642                true,
1643            )
1644            .await
1645            .unwrap();
1646
1647        let result = region.exit_staging_on_success(&mut manager).await;
1648        assert!(matches!(result, Err(Error::Unexpected { .. })));
1649    }
1650
1651    #[tokio::test]
1652    async fn test_set_region_state() {
1653        let env = SchedulerEnv::new().await;
1654        let builder = VersionControlBuilder::new();
1655        let version_control = Arc::new(builder.build());
1656        let manifest_ctx = env
1657            .mock_manifest_context(version_control.current().version.metadata.clone())
1658            .await;
1659
1660        let region_id = RegionId::new(1024, 0);
1661        // Leader -> Follower
1662        manifest_ctx.set_role(RegionRole::Follower, region_id);
1663        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1664
1665        // Follower -> Leader
1666        manifest_ctx.set_role(RegionRole::Leader, region_id);
1667        assert_eq!(
1668            manifest_ctx.state.load(),
1669            RegionRoleState::Leader(RegionLeaderState::Writable)
1670        );
1671
1672        // Leader -> Downgrading Leader
1673        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1674        assert_eq!(
1675            manifest_ctx.state.load(),
1676            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1677        );
1678
1679        // Downgrading Leader -> Follower
1680        manifest_ctx.set_role(RegionRole::Follower, region_id);
1681        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1682
1683        // Can't downgrade from follower (Follower -> Downgrading Leader)
1684        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1685        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1686
1687        // Set region role too Downgrading Leader
1688        manifest_ctx.set_role(RegionRole::Leader, region_id);
1689        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1690        assert_eq!(
1691            manifest_ctx.state.load(),
1692            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1693        );
1694
1695        // Downgrading Leader -> Leader
1696        manifest_ctx.set_role(RegionRole::Leader, region_id);
1697        assert_eq!(
1698            manifest_ctx.state.load(),
1699            RegionRoleState::Leader(RegionLeaderState::Writable)
1700        );
1701    }
1702
1703    #[tokio::test]
1704    async fn test_staging_state_validation() {
1705        let env = SchedulerEnv::new().await;
1706        let builder = VersionControlBuilder::new();
1707        let version_control = Arc::new(builder.build());
1708
1709        // Create context with staging state using the correct pattern from SchedulerEnv
1710        let staging_ctx = {
1711            let manager = RegionManifestManager::new(
1712                version_control.current().version.metadata.clone(),
1713                0,
1714                RegionManifestOptions {
1715                    manifest_dir: "".to_string(),
1716                    object_store: env.access_layer.object_store().clone(),
1717                    compress_type: CompressionType::Uncompressed,
1718                    checkpoint_distance: 10,
1719                    remove_file_options: Default::default(),
1720                    manifest_cache: None,
1721                },
1722                FormatType::PrimaryKey,
1723                &Default::default(),
1724            )
1725            .await
1726            .unwrap();
1727            Arc::new(ManifestContext::new(
1728                manager,
1729                RegionRoleState::Leader(RegionLeaderState::Staging),
1730            ))
1731        };
1732
1733        // Test staging state behavior
1734        assert_eq!(
1735            staging_ctx.current_state(),
1736            RegionRoleState::Leader(RegionLeaderState::Staging)
1737        );
1738
1739        // Test writable context for comparison
1740        let writable_ctx = env
1741            .mock_manifest_context(version_control.current().version.metadata.clone())
1742            .await;
1743
1744        assert_eq!(
1745            writable_ctx.current_state(),
1746            RegionRoleState::Leader(RegionLeaderState::Writable)
1747        );
1748    }
1749
1750    #[tokio::test]
1751    async fn test_staging_state_transitions() {
1752        let builder = VersionControlBuilder::new();
1753        let version_control = Arc::new(builder.build());
1754        let metadata = version_control.current().version.metadata.clone();
1755
1756        // Create MitoRegion for testing state transitions
1757        let temp_dir = create_temp_dir("");
1758        let path_str = temp_dir.path().display().to_string();
1759        let fs_builder = Fs::default().root(&path_str);
1760        let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1761
1762        let index_aux_path = temp_dir.path().join("index_aux");
1763        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1764            .await
1765            .unwrap();
1766        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1767            .await
1768            .unwrap();
1769
1770        let access_layer = Arc::new(AccessLayer::new(
1771            "",
1772            PathType::Bare,
1773            object_store,
1774            puffin_mgr,
1775            intm_mgr,
1776        ));
1777
1778        let manager = RegionManifestManager::new(
1779            metadata.clone(),
1780            0,
1781            RegionManifestOptions {
1782                manifest_dir: "".to_string(),
1783                object_store: access_layer.object_store().clone(),
1784                compress_type: CompressionType::Uncompressed,
1785                checkpoint_distance: 10,
1786                remove_file_options: Default::default(),
1787                manifest_cache: None,
1788            },
1789            FormatType::PrimaryKey,
1790            &Default::default(),
1791        )
1792        .await
1793        .unwrap();
1794
1795        let manifest_ctx = Arc::new(ManifestContext::new(
1796            manager,
1797            RegionRoleState::Leader(RegionLeaderState::Writable),
1798        ));
1799
1800        let region = MitoRegion {
1801            region_id: metadata.region_id,
1802            version_control,
1803            access_layer,
1804            manifest_ctx: manifest_ctx.clone(),
1805            file_purger: crate::test_util::new_noop_file_purger(),
1806            provider: Provider::noop_provider(),
1807            last_flush_millis: Default::default(),
1808            last_compaction_millis: Default::default(),
1809            time_provider: Arc::new(StdTimeProvider),
1810            topic_latest_entry_id: Default::default(),
1811            written_bytes: Arc::new(AtomicU64::new(0)),
1812            stats: ManifestStats::default(),
1813            staging_partition_info: Mutex::new(None),
1814        };
1815
1816        // Test initial state
1817        assert_eq!(
1818            region.state(),
1819            RegionRoleState::Leader(RegionLeaderState::Writable)
1820        );
1821        assert!(!region.is_staging());
1822
1823        // Test transition to staging
1824        let mut manager = manifest_ctx.manifest_manager.write().await;
1825        region.set_staging(&mut manager).await.unwrap();
1826        drop(manager);
1827        assert_eq!(
1828            region.state(),
1829            RegionRoleState::Leader(RegionLeaderState::Staging)
1830        );
1831        assert!(region.is_staging());
1832
1833        // Test transition back to writable
1834        region.exit_staging().unwrap();
1835        assert_eq!(
1836            region.state(),
1837            RegionRoleState::Leader(RegionLeaderState::Writable)
1838        );
1839        assert!(!region.is_staging());
1840
1841        // Test staging directory cleanup: Create dirty staging files before entering staging mode
1842        {
1843            // Create some dummy staging manifest files to simulate interrupted session
1844            let manager = manifest_ctx.manifest_manager.write().await;
1845            let dummy_actions = RegionMetaActionList::new(vec![]);
1846            let dummy_bytes = dummy_actions.encode().unwrap();
1847
1848            // Create dirty staging files with versions 100 and 101
1849            manager.store().save(100, &dummy_bytes, true).await.unwrap();
1850            manager.store().save(101, &dummy_bytes, true).await.unwrap();
1851            drop(manager);
1852
1853            // Verify dirty files exist before entering staging
1854            let manager = manifest_ctx.manifest_manager.read().await;
1855            let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1856            assert_eq!(
1857                dirty_manifests.len(),
1858                2,
1859                "Should have 2 dirty staging files"
1860            );
1861            drop(manager);
1862
1863            // Enter staging mode - this should clean up the dirty files
1864            let mut manager = manifest_ctx.manifest_manager.write().await;
1865            region.set_staging(&mut manager).await.unwrap();
1866            drop(manager);
1867
1868            // Verify dirty files are cleaned up after entering staging
1869            let manager = manifest_ctx.manifest_manager.read().await;
1870            let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1871            assert_eq!(
1872                cleaned_manifests.len(),
1873                0,
1874                "Dirty staging files should be cleaned up"
1875            );
1876            drop(manager);
1877
1878            // Exit staging to restore normal state for remaining tests
1879            region.exit_staging().unwrap();
1880        }
1881
1882        // Test invalid transitions
1883        let mut manager = manifest_ctx.manifest_manager.write().await;
1884        assert!(region.set_staging(&mut manager).await.is_ok()); // Writable -> Staging should work
1885        drop(manager);
1886        let mut manager = manifest_ctx.manifest_manager.write().await;
1887        assert!(region.set_staging(&mut manager).await.is_err()); // Staging -> Staging should fail
1888        drop(manager);
1889        assert!(region.exit_staging().is_ok()); // Staging -> Writable should work
1890        assert!(region.exit_staging().is_err()); // Writable -> Writable should fail
1891    }
1892}