mito2/
region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region.
16
17pub mod catchup;
18pub mod opener;
19pub mod options;
20pub mod utils;
21pub(crate) mod version;
22
23use std::collections::hash_map::Entry;
24use std::collections::{HashMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27
28use common_base::hash::partition_expr_version;
29use common_telemetry::{error, info, warn};
30use crossbeam_utils::atomic::AtomicCell;
31use partition::expr::PartitionExpr;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::PrimaryKeyEncoding;
35use store_api::logstore::provider::Provider;
36use store_api::metadata::RegionMetadataRef;
37use store_api::region_engine::{
38    RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
39};
40use store_api::region_request::{PathType, StagingPartitionDirective};
41use store_api::sst_entry::ManifestSstEntry;
42use store_api::storage::{FileId, RegionId, SequenceNumber};
43use tokio::sync::RwLockWriteGuard;
44pub use utils::*;
45
46use crate::access_layer::AccessLayerRef;
47use crate::error::{
48    InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
49    UnexpectedSnafu, UpdateManifestSnafu,
50};
51use crate::manifest::action::{
52    RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
53};
54use crate::manifest::manager::RegionManifestManager;
55use crate::region::version::{VersionControlRef, VersionRef};
56use crate::request::{OnFailure, OptionOutputTx};
57use crate::sst::file::FileMeta;
58use crate::sst::file_purger::FilePurgerRef;
59use crate::sst::location::{index_file_path, sst_file_path};
60use crate::time_provider::TimeProviderRef;
61
62/// This is the approximate factor to estimate the size of wal.
63const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
64
65/// Region status include region id, memtable usage, sst usage, wal usage and manifest usage.
66#[derive(Debug)]
67pub struct RegionUsage {
68    pub region_id: RegionId,
69    pub wal_usage: u64,
70    pub sst_usage: u64,
71    pub manifest_usage: u64,
72}
73
74impl RegionUsage {
75    pub fn disk_usage(&self) -> u64 {
76        self.wal_usage + self.sst_usage + self.manifest_usage
77    }
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum RegionLeaderState {
82    /// The region is opened and is writable.
83    Writable,
84    /// The region is in staging mode - writable but no checkpoint/compaction.
85    Staging,
86    /// The region is entering staging mode. - write requests will be stalled.
87    EnteringStaging,
88    /// The region is altering.
89    Altering,
90    /// The region is dropping.
91    Dropping,
92    /// The region is truncating.
93    Truncating,
94    /// The region is handling a region edit.
95    Editing,
96    /// The region is stepping down.
97    Downgrading,
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum RegionRoleState {
102    Leader(RegionLeaderState),
103    Follower,
104}
105
106impl RegionRoleState {
107    /// Converts the region role state to leader state if it is a leader state.
108    pub fn into_leader_state(self) -> Option<RegionLeaderState> {
109        match self {
110            RegionRoleState::Leader(leader_state) => Some(leader_state),
111            RegionRoleState::Follower => None,
112        }
113    }
114}
115
116/// Metadata and runtime status of a region.
117///
118/// Writing and reading a region follow a single-writer-multi-reader rule:
119/// - Only the region worker thread this region belongs to can modify the metadata.
120/// - Multiple reader threads are allowed to read a specific `version` of a region.
121#[derive(Debug)]
122pub struct MitoRegion {
123    /// Id of this region.
124    ///
125    /// Accessing region id from the version control is inconvenient so
126    /// we also store it here.
127    pub(crate) region_id: RegionId,
128
129    /// Version controller for this region.
130    ///
131    /// We MUST update the version control inside the write lock of the region manifest manager.
132    pub(crate) version_control: VersionControlRef,
133    /// SSTs accessor for this region.
134    pub(crate) access_layer: AccessLayerRef,
135    /// Context to maintain manifest for this region.
136    pub(crate) manifest_ctx: ManifestContextRef,
137    /// SST file purger.
138    pub(crate) file_purger: FilePurgerRef,
139    /// The provider of log store.
140    pub(crate) provider: Provider,
141    /// Last flush time in millis.
142    last_flush_millis: AtomicI64,
143    /// Last compaction time in millis.
144    last_compaction_millis: AtomicI64,
145    /// Provider to get current time.
146    time_provider: TimeProviderRef,
147    /// The topic's latest entry id since the region's last flushing.
148    /// **Only used for remote WAL pruning.**
149    ///
150    /// The value will be updated to the latest offset of the topic
151    /// if region receives a flush request or schedules a periodic flush task
152    /// and the region's memtable is empty.
153    ///
154    /// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region,
155    /// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`.
156    pub(crate) topic_latest_entry_id: AtomicU64,
157    /// The total bytes written to the region.
158    pub(crate) written_bytes: Arc<AtomicU64>,
159    /// Partition info of the region in staging mode.
160    ///
161    /// During the staging mode, the region metadata in [`VersionControlRef`] is not updated,
162    /// so we need to store the partition info separately.
163    pub(crate) staging_partition_info: Mutex<Option<StagingPartitionInfo>>,
164    /// manifest stats
165    stats: ManifestStats,
166}
167
168pub type MitoRegionRef = Arc<MitoRegion>;
169
170#[derive(Debug, Clone)]
171pub(crate) struct StagingPartitionInfo {
172    pub(crate) partition_directive: StagingPartitionDirective,
173    pub(crate) partition_rule_version: u64,
174}
175
176impl StagingPartitionInfo {
177    /// Returns the partition expression carried by the staging directive, if any.
178    pub(crate) fn partition_expr(&self) -> Option<&str> {
179        self.partition_directive.partition_expr()
180    }
181
182    /// Builds staging partition info from a directive and derives its version marker.
183    pub(crate) fn from_partition_directive(partition_directive: StagingPartitionDirective) -> Self {
184        let partition_rule_version = match &partition_directive {
185            StagingPartitionDirective::UpdatePartitionExpr(expr) => {
186                partition_expr_version(Some(expr))
187            }
188            StagingPartitionDirective::RejectAllWrites => 0,
189        };
190        Self {
191            partition_directive,
192            partition_rule_version,
193        }
194    }
195}
196
197impl MitoRegion {
198    /// Stop background managers for this region.
199    pub(crate) async fn stop(&self) {
200        self.manifest_ctx
201            .manifest_manager
202            .write()
203            .await
204            .stop()
205            .await;
206
207        info!(
208            "Stopped region manifest manager, region_id: {}",
209            self.region_id
210        );
211    }
212
213    /// Returns current metadata of the region.
214    pub fn metadata(&self) -> RegionMetadataRef {
215        let version_data = self.version_control.current();
216        version_data.version.metadata.clone()
217    }
218
219    /// Returns primary key encoding of the region.
220    pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
221        let version_data = self.version_control.current();
222        version_data.version.metadata.primary_key_encoding
223    }
224
225    /// Returns current version of the region.
226    pub(crate) fn version(&self) -> VersionRef {
227        let version_data = self.version_control.current();
228        version_data.version
229    }
230
231    /// Returns last flush timestamp in millis.
232    pub(crate) fn last_flush_millis(&self) -> i64 {
233        self.last_flush_millis.load(Ordering::Relaxed)
234    }
235
236    /// Update flush time to current time.
237    pub(crate) fn update_flush_millis(&self) {
238        let now = self.time_provider.current_time_millis();
239        self.last_flush_millis.store(now, Ordering::Relaxed);
240    }
241
242    /// Returns last compaction timestamp in millis.
243    pub(crate) fn last_compaction_millis(&self) -> i64 {
244        self.last_compaction_millis.load(Ordering::Relaxed)
245    }
246
247    /// Update compaction time to current time.
248    pub(crate) fn update_compaction_millis(&self) {
249        let now = self.time_provider.current_time_millis();
250        self.last_compaction_millis.store(now, Ordering::Relaxed);
251    }
252
253    /// Returns the table dir.
254    pub(crate) fn table_dir(&self) -> &str {
255        self.access_layer.table_dir()
256    }
257
258    /// Returns the path type of the region.
259    pub(crate) fn path_type(&self) -> PathType {
260        self.access_layer.path_type()
261    }
262
263    /// Returns whether the region is writable.
264    pub(crate) fn is_writable(&self) -> bool {
265        matches!(
266            self.manifest_ctx.state.load(),
267            RegionRoleState::Leader(RegionLeaderState::Writable)
268                | RegionRoleState::Leader(RegionLeaderState::Staging)
269        )
270    }
271
272    /// Returns whether the region is flushable.
273    pub(crate) fn is_flushable(&self) -> bool {
274        matches!(
275            self.manifest_ctx.state.load(),
276            RegionRoleState::Leader(RegionLeaderState::Writable)
277                | RegionRoleState::Leader(RegionLeaderState::Staging)
278                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
279        )
280    }
281
282    /// Returns whether the region should abort index building.
283    pub(crate) fn should_abort_index(&self) -> bool {
284        matches!(
285            self.manifest_ctx.state.load(),
286            RegionRoleState::Follower
287                | RegionRoleState::Leader(RegionLeaderState::Dropping)
288                | RegionRoleState::Leader(RegionLeaderState::Truncating)
289                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
290                | RegionRoleState::Leader(RegionLeaderState::Staging)
291        )
292    }
293
294    /// Returns whether the region is downgrading.
295    pub(crate) fn is_downgrading(&self) -> bool {
296        matches!(
297            self.manifest_ctx.state.load(),
298            RegionRoleState::Leader(RegionLeaderState::Downgrading)
299        )
300    }
301
302    /// Returns whether the region is in staging mode.
303    #[allow(dead_code)]
304    pub(crate) fn is_staging(&self) -> bool {
305        self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
306    }
307
308    pub fn region_id(&self) -> RegionId {
309        self.region_id
310    }
311
312    pub fn find_committed_sequence(&self) -> SequenceNumber {
313        self.version_control.committed_sequence()
314    }
315
316    /// Returns whether the region is readonly.
317    pub fn is_follower(&self) -> bool {
318        self.manifest_ctx.state.load() == RegionRoleState::Follower
319    }
320
321    /// Returns the state of the region.
322    pub(crate) fn state(&self) -> RegionRoleState {
323        self.manifest_ctx.state.load()
324    }
325
326    /// Sets the region role state.
327    pub(crate) fn set_role(&self, next_role: RegionRole) {
328        self.manifest_ctx.set_role(next_role, self.region_id);
329    }
330
331    /// Sets the altering state.
332    /// You should call this method in the worker loop.
333    pub(crate) fn set_altering(&self) -> Result<()> {
334        self.compare_exchange_state(
335            RegionLeaderState::Writable,
336            RegionRoleState::Leader(RegionLeaderState::Altering),
337        )
338    }
339
340    /// Sets the dropping state.
341    /// You should call this method in the worker loop.
342    pub(crate) fn set_dropping(&self, expect: RegionLeaderState) -> Result<()> {
343        self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Dropping))
344    }
345
346    /// Sets the truncating state.
347    /// You should call this method in the worker loop.
348    pub(crate) fn set_truncating(&self) -> Result<()> {
349        self.compare_exchange_state(
350            RegionLeaderState::Writable,
351            RegionRoleState::Leader(RegionLeaderState::Truncating),
352        )
353    }
354
355    /// Sets the editing state.
356    /// You should call this method in the worker loop.
357    pub(crate) fn set_editing(&self, expect: RegionLeaderState) -> Result<()> {
358        self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Editing))
359    }
360
361    /// Sets the staging state.
362    ///
363    /// You should call this method in the worker loop.
364    /// Transitions from Writable to Staging state.
365    /// Cleans any existing staging manifests before entering staging mode.
366    pub(crate) async fn set_staging(
367        &self,
368        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
369    ) -> Result<()> {
370        manager.store().clear_staging_manifests().await?;
371
372        self.compare_exchange_state(
373            RegionLeaderState::Writable,
374            RegionRoleState::Leader(RegionLeaderState::Staging),
375        )
376    }
377
378    /// Sets the entering staging state.
379    pub(crate) fn set_entering_staging(&self) -> Result<()> {
380        self.compare_exchange_state(
381            RegionLeaderState::Writable,
382            RegionRoleState::Leader(RegionLeaderState::EnteringStaging),
383        )
384    }
385
386    /// Exits the staging state back to writable.
387    ///
388    /// You should call this method in the worker loop.
389    /// Transitions from Staging to Writable state.
390    pub fn exit_staging(&self) -> Result<()> {
391        *self.staging_partition_info.lock().unwrap() = None;
392        self.compare_exchange_state(
393            RegionLeaderState::Staging,
394            RegionRoleState::Leader(RegionLeaderState::Writable),
395        )
396    }
397
398    /// Sets the region role state gracefully. This acquires the manifest write lock.
399    pub(crate) async fn set_role_state_gracefully(
400        &self,
401        state: SettableRegionRoleState,
402    ) -> Result<()> {
403        let mut manager: RwLockWriteGuard<'_, RegionManifestManager> =
404            self.manifest_ctx.manifest_manager.write().await;
405        let current_state = self.state();
406
407        match state {
408            SettableRegionRoleState::Leader => {
409                // Exit staging mode and return to normal writable leader
410                // Only allowed from staging state
411                match current_state {
412                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
413                        info!("Exiting staging mode for region {}", self.region_id);
414                        // Use the success exit path that merges all staged manifests
415                        self.exit_staging_on_success(&mut manager).await?;
416                    }
417                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
418                        // Already in desired state - no-op
419                        info!("Region {} already in normal leader mode", self.region_id);
420                    }
421                    _ => {
422                        // Only staging -> leader transition is allowed
423                        return Err(RegionStateSnafu {
424                            region_id: self.region_id,
425                            state: current_state,
426                            expect: RegionRoleState::Leader(RegionLeaderState::Staging),
427                        }
428                        .build());
429                    }
430                }
431            }
432
433            SettableRegionRoleState::StagingLeader => {
434                // Enter staging mode from normal writable leader
435                // Only allowed from writable leader state
436                match current_state {
437                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
438                        info!("Entering staging mode for region {}", self.region_id);
439                        self.set_staging(&mut manager).await?;
440                    }
441                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
442                        // Already in desired state - no-op
443                        info!("Region {} already in staging mode", self.region_id);
444                    }
445                    _ => {
446                        return Err(RegionStateSnafu {
447                            region_id: self.region_id,
448                            state: current_state,
449                            expect: RegionRoleState::Leader(RegionLeaderState::Writable),
450                        }
451                        .build());
452                    }
453                }
454            }
455
456            SettableRegionRoleState::Follower => {
457                // Make this region a follower
458                match current_state {
459                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
460                        info!(
461                            "Exiting staging and demoting region {} to follower",
462                            self.region_id
463                        );
464                        self.exit_staging()?;
465                        self.set_role(RegionRole::Follower);
466                    }
467                    RegionRoleState::Leader(_) => {
468                        info!("Demoting region {} from leader to follower", self.region_id);
469                        self.set_role(RegionRole::Follower);
470                    }
471                    RegionRoleState::Follower => {
472                        // Already in desired state - no-op
473                        info!("Region {} already in follower mode", self.region_id);
474                    }
475                }
476            }
477
478            SettableRegionRoleState::DowngradingLeader => {
479                // downgrade this region to downgrading leader
480                match current_state {
481                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
482                        info!(
483                            "Exiting staging and entering downgrade for region {}",
484                            self.region_id
485                        );
486                        self.exit_staging()?;
487                        self.set_role(RegionRole::DowngradingLeader);
488                    }
489                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
490                        info!("Starting downgrade for region {}", self.region_id);
491                        self.set_role(RegionRole::DowngradingLeader);
492                    }
493                    RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
494                        // Already in desired state - no-op
495                        info!("Region {} already in downgrading mode", self.region_id);
496                    }
497                    _ => {
498                        warn!(
499                            "Cannot start downgrade for region {} from state {:?}",
500                            self.region_id, current_state
501                        );
502                    }
503                }
504            }
505        }
506
507        // Hack(zhongzc): If we have just become leader (writable), persist any backfilled metadata.
508        if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
509            // Persist backfilled metadata if manifest is missing fields (e.g., partition_expr)
510            let manifest_meta = &manager.manifest().metadata;
511            let current_version = self.version();
512            let current_meta = &current_version.metadata;
513            if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
514                let action = RegionMetaAction::Change(RegionChange {
515                    metadata: current_meta.clone(),
516                    sst_format: current_version.options.sst_format.unwrap_or_default(),
517                    append_mode: None,
518                });
519                let result = manager
520                    .update(RegionMetaActionList::with_action(action), false)
521                    .await;
522
523                match result {
524                    Ok(version) => {
525                        info!(
526                            "Successfully persisted backfilled metadata for region {}, version: {}",
527                            self.region_id, version
528                        );
529                    }
530                    Err(e) => {
531                        warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
532                    }
533                }
534            }
535        }
536
537        drop(manager);
538
539        Ok(())
540    }
541
542    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Writable)` if the current state is `expect`.
543    /// Otherwise, logs an error.
544    pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
545        if let Err(e) = self
546            .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
547        {
548            error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
549        }
550    }
551
552    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Staging)` if the current state is `expect`.
553    /// Otherwise, logs an error.
554    pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
555        if let Err(e) =
556            self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
557        {
558            error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
559        }
560    }
561
562    /// Returns the region statistic.
563    pub(crate) fn region_statistic(&self) -> RegionStatistic {
564        let version = self.version();
565        let memtables = &version.memtables;
566        let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
567
568        let sst_usage = version.ssts.sst_usage();
569        let index_usage = version.ssts.index_usage();
570        let flushed_entry_id = version.flushed_entry_id;
571
572        let wal_usage = self.estimated_wal_usage(memtable_usage);
573        let manifest_usage = self.stats.total_manifest_size();
574        let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
575        let num_files = version.ssts.num_files();
576        let manifest_version = self.stats.manifest_version();
577        let file_removed_cnt = self.stats.file_removed_cnt();
578
579        let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
580        let written_bytes = self.written_bytes.load(Ordering::Relaxed);
581
582        RegionStatistic {
583            num_rows,
584            memtable_size: memtable_usage,
585            wal_size: wal_usage,
586            manifest_size: manifest_usage,
587            sst_size: sst_usage,
588            sst_num: num_files,
589            index_size: index_usage,
590            manifest: RegionManifestInfo::Mito {
591                manifest_version,
592                flushed_entry_id,
593                file_removed_cnt,
594            },
595            data_topic_latest_entry_id: topic_latest_entry_id,
596            metadata_topic_latest_entry_id: topic_latest_entry_id,
597            written_bytes,
598        }
599    }
600
601    /// Estimated WAL size in bytes.
602    /// Use the memtables size to estimate the size of wal.
603    fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
604        ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
605    }
606
607    /// Sets the state of the region to given state if the current state equals to
608    /// the expected.
609    fn compare_exchange_state(
610        &self,
611        expect: RegionLeaderState,
612        state: RegionRoleState,
613    ) -> Result<()> {
614        self.manifest_ctx
615            .state
616            .compare_exchange(RegionRoleState::Leader(expect), state)
617            .map_err(|actual| {
618                RegionStateSnafu {
619                    region_id: self.region_id,
620                    state: actual,
621                    expect: RegionRoleState::Leader(expect),
622                }
623                .build()
624            })?;
625        Ok(())
626    }
627
628    pub fn access_layer(&self) -> AccessLayerRef {
629        self.access_layer.clone()
630    }
631
632    /// Returns the SST entries of the region.
633    pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
634        let table_dir = self.table_dir();
635        let path_type = self.access_layer.path_type();
636
637        let visible_ssts = self
638            .version()
639            .ssts
640            .levels()
641            .iter()
642            .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
643            .collect::<HashSet<_>>();
644
645        let manifest_files = self.manifest_ctx.manifest().await.files.clone();
646        let staging_files = self
647            .manifest_ctx
648            .staging_manifest()
649            .await
650            .map(|m| m.files.clone())
651            .unwrap_or_default();
652        let files = manifest_files
653            .into_iter()
654            .chain(staging_files.into_iter())
655            .collect::<HashMap<_, _>>();
656
657        files
658            .values()
659            .map(|meta| {
660                let region_id = self.region_id;
661                let origin_region_id = meta.region_id;
662                let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
663                {
664                    let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
665                    (
666                        meta.index_version,
667                        Some(index_file_path),
668                        Some(meta.index_file_size),
669                    )
670                } else {
671                    (0, None, None)
672                };
673                let visible = visible_ssts.contains(&meta.file_id);
674                ManifestSstEntry {
675                    table_dir: table_dir.to_string(),
676                    region_id,
677                    table_id: region_id.table_id(),
678                    region_number: region_id.region_number(),
679                    region_group: region_id.region_group(),
680                    region_sequence: region_id.region_sequence(),
681                    file_id: meta.file_id.to_string(),
682                    index_version,
683                    level: meta.level,
684                    file_path: sst_file_path(table_dir, meta.file_id(), path_type),
685                    file_size: meta.file_size,
686                    index_file_path,
687                    index_file_size,
688                    num_rows: meta.num_rows,
689                    num_row_groups: meta.num_row_groups,
690                    num_series: Some(meta.num_series),
691                    min_ts: meta.time_range.0,
692                    max_ts: meta.time_range.1,
693                    sequence: meta.sequence.map(|s| s.get()),
694                    origin_region_id,
695                    node_id: None,
696                    visible,
697                }
698            })
699            .collect()
700    }
701
702    /// Returns the file metas of the region by file ids.
703    pub async fn file_metas(&self, file_ids: &[FileId]) -> Vec<Option<FileMeta>> {
704        let manifest_files = self.manifest_ctx.manifest().await.files.clone();
705
706        file_ids
707            .iter()
708            .map(|file_id| manifest_files.get(file_id).cloned())
709            .collect::<Vec<_>>()
710    }
711
712    /// Exit staging mode successfully by merging all staged manifests and making them visible.
713    pub(crate) async fn exit_staging_on_success(
714        &self,
715        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
716    ) -> Result<()> {
717        let current_state = self.manifest_ctx.current_state();
718        ensure!(
719            current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
720            RegionStateSnafu {
721                region_id: self.region_id,
722                state: current_state,
723                expect: RegionRoleState::Leader(RegionLeaderState::Staging),
724            }
725        );
726
727        // Merge all staged manifest actions
728        let merged_actions = match manager.merge_staged_actions(current_state).await? {
729            Some(actions) => actions,
730            None => {
731                info!(
732                    "No staged manifests to merge for region {}, exiting staging mode without changes",
733                    self.region_id
734                );
735                // Even if no manifests to merge, we still need to exit staging mode
736                self.exit_staging()?;
737                return Ok(());
738            }
739        };
740        let expect_change = merged_actions.actions.iter().any(|a| a.is_change());
741        let expect_partition_expr_change = merged_actions
742            .actions
743            .iter()
744            .any(|a| a.is_partition_expr_change());
745        let expect_edit = merged_actions.actions.iter().any(|a| a.is_edit());
746        ensure!(
747            !(expect_change && expect_partition_expr_change),
748            UnexpectedSnafu {
749                reason: "unexpected both change and partition expr change actions in merged actions"
750            }
751        );
752        ensure!(
753            expect_change || expect_partition_expr_change,
754            UnexpectedSnafu {
755                reason: "expect a change or partition expr change action in merged actions"
756            }
757        );
758        ensure!(
759            expect_edit,
760            UnexpectedSnafu {
761                reason: "expect an edit action in merged actions"
762            }
763        );
764
765        let (merged_partition_expr_change, merged_change, merged_edit) =
766            merged_actions.clone().split_region_change_and_edit();
767        if let Some(change) = &merged_change {
768            // In staging exit we only allow metadata-only updates. A `Change`
769            // action is accepted only when column definitions are unchanged;
770            // otherwise it is treated as a schema change and rejected.
771            let current_column_metadatas = &self.version().metadata.column_metadatas;
772            ensure!(
773                change.metadata.column_metadatas == *current_column_metadatas,
774                UnexpectedSnafu {
775                    reason: "change action alters column metadata in staging exit"
776                }
777            );
778        }
779
780        // Submit merged actions using the manifest manager's update method
781        // Pass the `false` so it saves to normal directory, not staging
782        let new_version = manager.update(merged_actions, false).await?;
783        info!(
784            "Successfully submitted merged staged manifests for region {}, new version: {}",
785            self.region_id, new_version
786        );
787
788        // Apply the merged changes to in-memory version control
789        if let Some(change) = merged_partition_expr_change {
790            let mut new_metadata = self.version().metadata.as_ref().clone();
791            new_metadata.set_partition_expr(change.partition_expr);
792            self.version_control.alter_metadata(new_metadata.into());
793        }
794        if let Some(change) = merged_change {
795            self.version_control.alter_metadata(change.metadata);
796        }
797        self.version_control
798            .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
799
800        // Clear all staging manifests and transit state
801        if let Err(e) = manager.clear_staging_manifest_and_dir().await {
802            error!(e; "Failed to clear staging manifest dir for region {}", self.region_id);
803        }
804        self.exit_staging()?;
805
806        Ok(())
807    }
808
809    /// Returns the partition expression string for this region.
810    ///
811    /// If the region is currently in staging state, this returns the partition expression held in
812    /// the staging partition field. Otherwise, it returns the partition expression from the primary
813    /// region metadata (current committed version).
814    pub fn maybe_staging_partition_expr_str(&self) -> Option<String> {
815        let is_staging = self.is_staging();
816        if is_staging {
817            let staging_partition_info = self.staging_partition_info.lock().unwrap();
818            if staging_partition_info.is_none() {
819                warn!(
820                    "Staging partition expr is none for region {} in staging state",
821                    self.region_id
822                );
823            }
824            staging_partition_info
825                .as_ref()
826                .and_then(|info| info.partition_expr().map(ToString::to_string))
827        } else {
828            let version = self.version();
829            version.metadata.partition_expr.clone()
830        }
831    }
832
833    pub fn expected_partition_expr_version(&self) -> u64 {
834        if self.is_staging() {
835            let staging_partition_info = self.staging_partition_info.lock().unwrap();
836            staging_partition_info
837                .as_ref()
838                .map(|info| info.partition_rule_version)
839                .unwrap_or_default()
840        } else {
841            self.version().metadata.partition_expr_version
842        }
843    }
844
845    /// Returns whether writes should be rejected for this region in staging mode.
846    pub(crate) fn reject_all_writes_in_staging(&self) -> bool {
847        if !self.is_staging() {
848            return false;
849        }
850        let staging_partition_info = self.staging_partition_info.lock().unwrap();
851        staging_partition_info
852            .as_ref()
853            .map(|info| {
854                matches!(
855                    info.partition_directive,
856                    StagingPartitionDirective::RejectAllWrites
857                )
858            })
859            .unwrap_or(false)
860    }
861}
862
863/// Context to update the region manifest.
864#[derive(Debug)]
865pub(crate) struct ManifestContext {
866    /// Manager to maintain manifest for this region.
867    pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
868    /// The state of the region. The region checks the state before updating
869    /// manifest.
870    state: AtomicCell<RegionRoleState>,
871}
872
873impl ManifestContext {
874    pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
875        ManifestContext {
876            manifest_manager: tokio::sync::RwLock::new(manager),
877            state: AtomicCell::new(state),
878        }
879    }
880
881    pub(crate) async fn manifest_version(&self) -> ManifestVersion {
882        self.manifest_manager
883            .read()
884            .await
885            .manifest()
886            .manifest_version
887    }
888
889    pub(crate) async fn has_update(&self) -> Result<bool> {
890        self.manifest_manager.read().await.has_update().await
891    }
892
893    /// Returns the current region role state.
894    pub(crate) fn current_state(&self) -> RegionRoleState {
895        self.state.load()
896    }
897
898    /// Installs the manifest changes from the current version to the target version (inclusive).
899    ///
900    /// Returns installed [RegionManifest].
901    /// **Note**: This function is not guaranteed to install the target version strictly.
902    /// The installed version may be greater than the target version.
903    pub(crate) async fn install_manifest_to(
904        &self,
905        version: ManifestVersion,
906    ) -> Result<Arc<RegionManifest>> {
907        let mut manager = self.manifest_manager.write().await;
908        manager.install_manifest_to(version).await?;
909
910        Ok(manager.manifest())
911    }
912
913    /// Updates the manifest if current state is `expect_state`.
914    pub(crate) async fn update_manifest(
915        &self,
916        expect_state: RegionLeaderState,
917        action_list: RegionMetaActionList,
918        is_staging: bool,
919    ) -> Result<ManifestVersion> {
920        // Acquires the write lock of the manifest manager.
921        let mut manager = self.manifest_manager.write().await;
922        // Gets current manifest.
923        let manifest = manager.manifest();
924        // Checks state inside the lock. This is to ensure that we won't update the manifest
925        // after `set_readonly_gracefully()` is called.
926        let current_state = self.state.load();
927
928        // If expect_state is not downgrading, the current state must be either `expect_state` or downgrading.
929        //
930        // A downgrading leader rejects user writes but still allows
931        // flushing the memtable and updating the manifest.
932        if expect_state != RegionLeaderState::Downgrading {
933            if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
934                info!(
935                    "Region {} is in downgrading leader state, updating manifest. state is {:?}",
936                    manifest.metadata.region_id, expect_state
937                );
938            }
939            ensure!(
940                current_state == RegionRoleState::Leader(expect_state)
941                    || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
942                UpdateManifestSnafu {
943                    region_id: manifest.metadata.region_id,
944                    state: current_state,
945                }
946            );
947        } else {
948            ensure!(
949                current_state == RegionRoleState::Leader(expect_state),
950                RegionStateSnafu {
951                    region_id: manifest.metadata.region_id,
952                    state: current_state,
953                    expect: RegionRoleState::Leader(expect_state),
954                }
955            );
956        }
957
958        for action in &action_list.actions {
959            // Checks whether the edit is still applicable.
960            let RegionMetaAction::Edit(edit) = &action else {
961                continue;
962            };
963
964            // Checks whether the region is truncated.
965            let Some(truncated_entry_id) = manifest.truncated_entry_id else {
966                continue;
967            };
968
969            // This is an edit from flush.
970            if let Some(flushed_entry_id) = edit.flushed_entry_id {
971                ensure!(
972                    truncated_entry_id < flushed_entry_id,
973                    RegionTruncatedSnafu {
974                        region_id: manifest.metadata.region_id,
975                    }
976                );
977            }
978
979            // This is an edit from compaction.
980            if !edit.files_to_remove.is_empty() {
981                // Input files of the compaction task has been truncated.
982                for file in &edit.files_to_remove {
983                    ensure!(
984                        manifest.files.contains_key(&file.file_id),
985                        RegionTruncatedSnafu {
986                            region_id: manifest.metadata.region_id,
987                        }
988                    );
989                }
990            }
991        }
992
993        // Now we can update the manifest.
994        let version = manager.update(action_list, is_staging).await.inspect_err(
995            |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
996        )?;
997
998        if self.state.load() == RegionRoleState::Follower {
999            warn!(
1000                "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
1001                manifest.metadata.region_id
1002            );
1003        }
1004
1005        Ok(version)
1006    }
1007
1008    /// Sets the [`RegionRole`].
1009    ///
1010    /// ```text
1011    ///     +------------------------------------------+
1012    ///     |                      +-----------------+ |
1013    ///     |                      |                 | |
1014    /// +---+------+       +-------+-----+        +--v-v---+
1015    /// | Follower |       | Downgrading |        | Leader |
1016    /// +---^-^----+       +-----+-^-----+        +--+-+---+
1017    ///     | |                  | |                 | |
1018    ///     | +------------------+ +-----------------+ |
1019    ///     +------------------------------------------+
1020    ///
1021    /// Transition:
1022    /// - Follower -> Leader
1023    /// - Downgrading Leader -> Leader
1024    /// - Leader -> Follower
1025    /// - Downgrading Leader -> Follower
1026    /// - Leader -> Downgrading Leader
1027    ///
1028    /// ```
1029    pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
1030        match next_role {
1031            RegionRole::Follower => {
1032                match self.state.fetch_update(|state| {
1033                    if !matches!(state, RegionRoleState::Follower) {
1034                        Some(RegionRoleState::Follower)
1035                    } else {
1036                        None
1037                    }
1038                }) {
1039                    Ok(state) => info!(
1040                        "Convert region {} to follower, previous role state: {:?}",
1041                        region_id, state
1042                    ),
1043                    Err(state) => {
1044                        if state != RegionRoleState::Follower {
1045                            warn!(
1046                                "Failed to convert region {} to follower, current role state: {:?}",
1047                                region_id, state
1048                            )
1049                        }
1050                    }
1051                }
1052            }
1053            RegionRole::Leader => {
1054                match self.state.fetch_update(|state| {
1055                    if matches!(
1056                        state,
1057                        RegionRoleState::Follower
1058                            | RegionRoleState::Leader(RegionLeaderState::Downgrading)
1059                    ) {
1060                        Some(RegionRoleState::Leader(RegionLeaderState::Writable))
1061                    } else {
1062                        None
1063                    }
1064                }) {
1065                    Ok(state) => info!(
1066                        "Convert region {} to leader, previous role state: {:?}",
1067                        region_id, state
1068                    ),
1069                    Err(state) => {
1070                        if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
1071                            warn!(
1072                                "Failed to convert region {} to leader, current role state: {:?}",
1073                                region_id, state
1074                            )
1075                        }
1076                    }
1077                }
1078            }
1079            RegionRole::DowngradingLeader => {
1080                match self.state.compare_exchange(
1081                    RegionRoleState::Leader(RegionLeaderState::Writable),
1082                    RegionRoleState::Leader(RegionLeaderState::Downgrading),
1083                ) {
1084                    Ok(state) => info!(
1085                        "Convert region {} to downgrading region, previous role state: {:?}",
1086                        region_id, state
1087                    ),
1088                    Err(state) => {
1089                        if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
1090                            warn!(
1091                                "Failed to convert region {} to downgrading leader, current role state: {:?}",
1092                                region_id, state
1093                            )
1094                        }
1095                    }
1096                }
1097            }
1098        }
1099    }
1100
1101    /// Returns the normal manifest of the region.
1102    pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
1103        self.manifest_manager.read().await.manifest()
1104    }
1105
1106    /// Returns the staging manifest of the region.
1107    pub(crate) async fn staging_manifest(
1108        &self,
1109    ) -> Option<Arc<crate::manifest::action::RegionManifest>> {
1110        self.manifest_manager.read().await.staging_manifest()
1111    }
1112}
1113
1114pub(crate) type ManifestContextRef = Arc<ManifestContext>;
1115
1116/// Regions indexed by ids.
1117#[derive(Debug, Default)]
1118pub(crate) struct RegionMap {
1119    regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
1120}
1121
1122impl RegionMap {
1123    /// Returns true if the region exists.
1124    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1125        let regions = self.regions.read().unwrap();
1126        regions.contains_key(&region_id)
1127    }
1128
1129    /// Inserts a new region into the map.
1130    pub(crate) fn insert_region(&self, region: MitoRegionRef) {
1131        let mut regions = self.regions.write().unwrap();
1132        regions.insert(region.region_id, region);
1133    }
1134
1135    /// Gets region by region id.
1136    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1137        let regions = self.regions.read().unwrap();
1138        regions.get(&region_id).cloned()
1139    }
1140
1141    /// Gets writable region by region id.
1142    ///
1143    /// Returns error if the region does not exist or is readonly.
1144    pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1145        let region = self
1146            .get_region(region_id)
1147            .context(RegionNotFoundSnafu { region_id })?;
1148        ensure!(
1149            region.is_writable(),
1150            RegionStateSnafu {
1151                region_id,
1152                state: region.state(),
1153                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1154            }
1155        );
1156        Ok(region)
1157    }
1158
1159    /// Gets readonly region by region id.
1160    ///
1161    /// Returns error if the region does not exist or is writable.
1162    pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1163        let region = self
1164            .get_region(region_id)
1165            .context(RegionNotFoundSnafu { region_id })?;
1166        ensure!(
1167            region.is_follower(),
1168            RegionStateSnafu {
1169                region_id,
1170                state: region.state(),
1171                expect: RegionRoleState::Follower,
1172            }
1173        );
1174
1175        Ok(region)
1176    }
1177
1178    /// Gets region by region id.
1179    ///
1180    /// Calls the callback if the region does not exist.
1181    pub(crate) fn get_region_or<F: OnFailure>(
1182        &self,
1183        region_id: RegionId,
1184        cb: &mut F,
1185    ) -> Option<MitoRegionRef> {
1186        match self
1187            .get_region(region_id)
1188            .context(RegionNotFoundSnafu { region_id })
1189        {
1190            Ok(region) => Some(region),
1191            Err(e) => {
1192                cb.on_failure(e);
1193                None
1194            }
1195        }
1196    }
1197
1198    /// Gets writable region by region id.
1199    ///
1200    /// Calls the callback if the region does not exist or is readonly.
1201    pub(crate) fn writable_region_or<F: OnFailure>(
1202        &self,
1203        region_id: RegionId,
1204        cb: &mut F,
1205    ) -> Option<MitoRegionRef> {
1206        match self.writable_region(region_id) {
1207            Ok(region) => Some(region),
1208            Err(e) => {
1209                cb.on_failure(e);
1210                None
1211            }
1212        }
1213    }
1214
1215    /// Gets writable non-staging region by region id.
1216    ///
1217    /// Returns error if the region does not exist, is readonly, or is in staging mode.
1218    pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1219        let region = self.writable_region(region_id)?;
1220        if region.is_staging() {
1221            return Err(crate::error::RegionStateSnafu {
1222                region_id,
1223                state: region.state(),
1224                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1225            }
1226            .build());
1227        }
1228        Ok(region)
1229    }
1230
1231    /// Gets staging region by region id.
1232    ///
1233    /// Returns error if the region does not exist or is not in staging state.
1234    pub(crate) fn staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1235        let region = self
1236            .get_region(region_id)
1237            .context(RegionNotFoundSnafu { region_id })?;
1238        ensure!(
1239            region.is_staging(),
1240            RegionStateSnafu {
1241                region_id,
1242                state: region.state(),
1243                expect: RegionRoleState::Leader(RegionLeaderState::Staging),
1244            }
1245        );
1246        Ok(region)
1247    }
1248
1249    /// Gets flushable region by region id.
1250    ///
1251    /// Returns error if the region does not exist.
1252    /// Returns None if the region exists but not operatable.
1253    fn flushable_region(&self, region_id: RegionId) -> Result<Option<MitoRegionRef>> {
1254        let region = self
1255            .get_region(region_id)
1256            .context(RegionNotFoundSnafu { region_id })?;
1257        if region.is_flushable() {
1258            Ok(Some(region))
1259        } else {
1260            Ok(None)
1261        }
1262    }
1263
1264    /// Gets flushable region by region id.
1265    ///
1266    /// Calls the callback if the region does not exist.
1267    /// Returns None if the region exists but not operatable.
1268    pub(crate) fn flushable_region_or<F: OnFailure>(
1269        &self,
1270        region_id: RegionId,
1271        cb: &mut F,
1272    ) -> Option<MitoRegionRef> {
1273        match self.flushable_region(region_id) {
1274            Ok(region) => region,
1275            Err(e) => {
1276                cb.on_failure(e);
1277                None
1278            }
1279        }
1280    }
1281
1282    /// Remove region by id.
1283    pub(crate) fn remove_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1284        let mut regions = self.regions.write().unwrap();
1285        regions.remove(&region_id)
1286    }
1287
1288    /// List all regions.
1289    pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1290        let regions = self.regions.read().unwrap();
1291        regions.values().cloned().collect()
1292    }
1293
1294    /// Clear the map.
1295    pub(crate) fn clear(&self) {
1296        self.regions.write().unwrap().clear();
1297    }
1298}
1299
1300pub(crate) type RegionMapRef = Arc<RegionMap>;
1301
1302/// Opening regions
1303#[derive(Debug, Default)]
1304pub(crate) struct OpeningRegions {
1305    regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1306}
1307
1308impl OpeningRegions {
1309    /// Registers `sender` for an opening region; Otherwise, it returns `None`.
1310    pub(crate) fn wait_for_opening_region(
1311        &self,
1312        region_id: RegionId,
1313        sender: OptionOutputTx,
1314    ) -> Option<OptionOutputTx> {
1315        let mut regions = self.regions.write().unwrap();
1316        match regions.entry(region_id) {
1317            Entry::Occupied(mut senders) => {
1318                senders.get_mut().push(sender);
1319                None
1320            }
1321            Entry::Vacant(_) => Some(sender),
1322        }
1323    }
1324
1325    /// Returns true if the region exists.
1326    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1327        let regions = self.regions.read().unwrap();
1328        regions.contains_key(&region_id)
1329    }
1330
1331    /// Inserts a new region into the map.
1332    pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1333        let mut regions = self.regions.write().unwrap();
1334        regions.insert(region, vec![sender]);
1335    }
1336
1337    /// Remove region by id.
1338    pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1339        let mut regions = self.regions.write().unwrap();
1340        regions.remove(&region_id).unwrap_or_default()
1341    }
1342
1343    #[cfg(test)]
1344    pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1345        let regions = self.regions.read().unwrap();
1346        if let Some(senders) = regions.get(&region_id) {
1347            senders.len()
1348        } else {
1349            0
1350        }
1351    }
1352}
1353
1354pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1355
1356/// The regions that are catching up.
1357#[derive(Debug, Default)]
1358pub(crate) struct CatchupRegions {
1359    regions: RwLock<HashSet<RegionId>>,
1360}
1361
1362impl CatchupRegions {
1363    /// Returns true if the region exists.
1364    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1365        let regions = self.regions.read().unwrap();
1366        regions.contains(&region_id)
1367    }
1368
1369    /// Inserts a new region into the set.
1370    pub(crate) fn insert_region(&self, region_id: RegionId) {
1371        let mut regions = self.regions.write().unwrap();
1372        regions.insert(region_id);
1373    }
1374
1375    /// Remove region by id.
1376    pub(crate) fn remove_region(&self, region_id: RegionId) {
1377        let mut regions = self.regions.write().unwrap();
1378        regions.remove(&region_id);
1379    }
1380}
1381
1382pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1383
1384/// Manifest stats.
1385#[derive(Default, Debug, Clone)]
1386pub struct ManifestStats {
1387    pub(crate) total_manifest_size: Arc<AtomicU64>,
1388    pub(crate) manifest_version: Arc<AtomicU64>,
1389    pub(crate) file_removed_cnt: Arc<AtomicU64>,
1390}
1391
1392impl ManifestStats {
1393    fn total_manifest_size(&self) -> u64 {
1394        self.total_manifest_size.load(Ordering::Relaxed)
1395    }
1396
1397    fn manifest_version(&self) -> u64 {
1398        self.manifest_version.load(Ordering::Relaxed)
1399    }
1400
1401    fn file_removed_cnt(&self) -> u64 {
1402        self.file_removed_cnt.load(Ordering::Relaxed)
1403    }
1404}
1405
1406/// Parses the partition expression from a JSON string.
1407pub fn parse_partition_expr(partition_expr_str: Option<&str>) -> Result<Option<PartitionExpr>> {
1408    match partition_expr_str {
1409        None => Ok(None),
1410        Some("") => Ok(None),
1411        Some(json_str) => {
1412            let expr = partition::expr::PartitionExpr::from_json_str(json_str)
1413                .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?;
1414            Ok(expr)
1415        }
1416    }
1417}
1418
1419#[cfg(test)]
1420mod tests {
1421    use std::sync::atomic::AtomicU64;
1422    use std::sync::{Arc, Mutex};
1423
1424    use common_datasource::compression::CompressionType;
1425    use common_test_util::temp_dir::create_temp_dir;
1426    use crossbeam_utils::atomic::AtomicCell;
1427    use object_store::ObjectStore;
1428    use object_store::services::Fs;
1429    use store_api::logstore::provider::Provider;
1430    use store_api::region_engine::RegionRole;
1431    use store_api::region_request::PathType;
1432    use store_api::storage::RegionId;
1433
1434    use crate::access_layer::AccessLayer;
1435    use crate::error::Error;
1436    use crate::manifest::action::{
1437        RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionPartitionExprChange,
1438    };
1439    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1440    use crate::region::{
1441        ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1442    };
1443    use crate::sst::FormatType;
1444    use crate::sst::index::intermediate::IntermediateManager;
1445    use crate::sst::index::puffin_manager::PuffinManagerFactory;
1446    use crate::test_util::scheduler_util::SchedulerEnv;
1447    use crate::test_util::version_util::VersionControlBuilder;
1448    use crate::time_provider::StdTimeProvider;
1449
1450    #[test]
1451    fn test_region_state_lock_free() {
1452        assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1453    }
1454
1455    async fn build_test_region(env: &SchedulerEnv) -> MitoRegion {
1456        let builder = VersionControlBuilder::new();
1457        let version_control = Arc::new(builder.build());
1458        let metadata = version_control.current().version.metadata.clone();
1459
1460        let manager = RegionManifestManager::new(
1461            metadata.clone(),
1462            0,
1463            RegionManifestOptions {
1464                manifest_dir: "".to_string(),
1465                object_store: env.access_layer.object_store().clone(),
1466                compress_type: CompressionType::Uncompressed,
1467                checkpoint_distance: 10,
1468                remove_file_options: Default::default(),
1469                manifest_cache: None,
1470            },
1471            FormatType::PrimaryKey,
1472            &Default::default(),
1473        )
1474        .await
1475        .unwrap();
1476
1477        let manifest_ctx = Arc::new(ManifestContext::new(
1478            manager,
1479            RegionRoleState::Leader(RegionLeaderState::Writable),
1480        ));
1481
1482        MitoRegion {
1483            region_id: metadata.region_id,
1484            version_control,
1485            access_layer: env.access_layer.clone(),
1486            manifest_ctx,
1487            file_purger: crate::test_util::new_noop_file_purger(),
1488            provider: Provider::noop_provider(),
1489            last_flush_millis: Default::default(),
1490            last_compaction_millis: Default::default(),
1491            time_provider: Arc::new(StdTimeProvider),
1492            topic_latest_entry_id: Default::default(),
1493            written_bytes: Arc::new(AtomicU64::new(0)),
1494            stats: ManifestStats::default(),
1495            staging_partition_info: Mutex::new(None),
1496        }
1497    }
1498
1499    fn empty_edit() -> RegionEdit {
1500        RegionEdit {
1501            files_to_add: Vec::new(),
1502            files_to_remove: Vec::new(),
1503            timestamp_ms: None,
1504            compaction_time_window: None,
1505            flushed_entry_id: None,
1506            flushed_sequence: None,
1507            committed_sequence: None,
1508        }
1509    }
1510
1511    #[tokio::test]
1512    async fn test_exit_staging_partition_expr_change_and_edit_success() {
1513        let env = SchedulerEnv::new().await;
1514        let region = build_test_region(&env).await;
1515
1516        let mut manager = region.manifest_ctx.manifest_manager.write().await;
1517        region.set_staging(&mut manager).await.unwrap();
1518        manager
1519            .update(
1520                RegionMetaActionList::new(vec![
1521                    RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
1522                        partition_expr: Some("expr_a".to_string()),
1523                    }),
1524                    RegionMetaAction::Edit(empty_edit()),
1525                ]),
1526                true,
1527            )
1528            .await
1529            .unwrap();
1530
1531        region.exit_staging_on_success(&mut manager).await.unwrap();
1532        drop(manager);
1533
1534        assert_eq!(
1535            region.version().metadata.partition_expr.as_deref(),
1536            Some("expr_a")
1537        );
1538        assert_eq!(
1539            region.state(),
1540            RegionRoleState::Leader(RegionLeaderState::Writable)
1541        );
1542    }
1543
1544    #[tokio::test]
1545    async fn test_exit_staging_change_with_same_columns_success() {
1546        let env = SchedulerEnv::new().await;
1547        let region = build_test_region(&env).await;
1548
1549        let mut manager = region.manifest_ctx.manifest_manager.write().await;
1550        region.set_staging(&mut manager).await.unwrap();
1551
1552        let mut changed_metadata = region.version().metadata.as_ref().clone();
1553        changed_metadata.set_partition_expr(Some("expr_b".to_string()));
1554
1555        manager
1556            .update(
1557                RegionMetaActionList::new(vec![
1558                    RegionMetaAction::Change(RegionChange {
1559                        metadata: Arc::new(changed_metadata),
1560                        sst_format: FormatType::PrimaryKey,
1561                        append_mode: None,
1562                    }),
1563                    RegionMetaAction::Edit(empty_edit()),
1564                ]),
1565                true,
1566            )
1567            .await
1568            .unwrap();
1569
1570        region.exit_staging_on_success(&mut manager).await.unwrap();
1571        drop(manager);
1572
1573        assert_eq!(
1574            region.version().metadata.partition_expr.as_deref(),
1575            Some("expr_b")
1576        );
1577        assert_eq!(
1578            region.state(),
1579            RegionRoleState::Leader(RegionLeaderState::Writable)
1580        );
1581    }
1582
1583    #[tokio::test]
1584    async fn test_exit_staging_change_with_different_columns_fails() {
1585        let env = SchedulerEnv::new().await;
1586        let region = build_test_region(&env).await;
1587
1588        let mut manager = region.manifest_ctx.manifest_manager.write().await;
1589        region.set_staging(&mut manager).await.unwrap();
1590
1591        let mut changed_metadata = region.version().metadata.as_ref().clone();
1592        changed_metadata.column_metadatas.rotate_left(1);
1593
1594        manager
1595            .update(
1596                RegionMetaActionList::new(vec![
1597                    RegionMetaAction::Change(RegionChange {
1598                        metadata: Arc::new(changed_metadata),
1599                        sst_format: FormatType::PrimaryKey,
1600                        append_mode: None,
1601                    }),
1602                    RegionMetaAction::Edit(empty_edit()),
1603                ]),
1604                true,
1605            )
1606            .await
1607            .unwrap();
1608
1609        let result = region.exit_staging_on_success(&mut manager).await;
1610        assert!(matches!(result, Err(Error::Unexpected { .. })));
1611    }
1612
1613    #[tokio::test]
1614    async fn test_exit_staging_partition_expr_change_and_change_conflict_fails() {
1615        let env = SchedulerEnv::new().await;
1616        let region = build_test_region(&env).await;
1617
1618        let mut manager = region.manifest_ctx.manifest_manager.write().await;
1619        region.set_staging(&mut manager).await.unwrap();
1620
1621        let mut changed_metadata = region.version().metadata.as_ref().clone();
1622        changed_metadata.set_partition_expr(Some("expr_c".to_string()));
1623
1624        manager
1625            .update(
1626                RegionMetaActionList::new(vec![
1627                    RegionMetaAction::PartitionExprChange(RegionPartitionExprChange {
1628                        partition_expr: Some("expr_c".to_string()),
1629                    }),
1630                    RegionMetaAction::Change(RegionChange {
1631                        metadata: Arc::new(changed_metadata),
1632                        sst_format: FormatType::PrimaryKey,
1633                        append_mode: None,
1634                    }),
1635                    RegionMetaAction::Edit(empty_edit()),
1636                ]),
1637                true,
1638            )
1639            .await
1640            .unwrap();
1641
1642        let result = region.exit_staging_on_success(&mut manager).await;
1643        assert!(matches!(result, Err(Error::Unexpected { .. })));
1644    }
1645
1646    #[tokio::test]
1647    async fn test_set_region_state() {
1648        let env = SchedulerEnv::new().await;
1649        let builder = VersionControlBuilder::new();
1650        let version_control = Arc::new(builder.build());
1651        let manifest_ctx = env
1652            .mock_manifest_context(version_control.current().version.metadata.clone())
1653            .await;
1654
1655        let region_id = RegionId::new(1024, 0);
1656        // Leader -> Follower
1657        manifest_ctx.set_role(RegionRole::Follower, region_id);
1658        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1659
1660        // Follower -> Leader
1661        manifest_ctx.set_role(RegionRole::Leader, region_id);
1662        assert_eq!(
1663            manifest_ctx.state.load(),
1664            RegionRoleState::Leader(RegionLeaderState::Writable)
1665        );
1666
1667        // Leader -> Downgrading Leader
1668        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1669        assert_eq!(
1670            manifest_ctx.state.load(),
1671            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1672        );
1673
1674        // Downgrading Leader -> Follower
1675        manifest_ctx.set_role(RegionRole::Follower, region_id);
1676        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1677
1678        // Can't downgrade from follower (Follower -> Downgrading Leader)
1679        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1680        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1681
1682        // Set region role too Downgrading Leader
1683        manifest_ctx.set_role(RegionRole::Leader, region_id);
1684        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1685        assert_eq!(
1686            manifest_ctx.state.load(),
1687            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1688        );
1689
1690        // Downgrading Leader -> Leader
1691        manifest_ctx.set_role(RegionRole::Leader, region_id);
1692        assert_eq!(
1693            manifest_ctx.state.load(),
1694            RegionRoleState::Leader(RegionLeaderState::Writable)
1695        );
1696    }
1697
1698    #[tokio::test]
1699    async fn test_staging_state_validation() {
1700        let env = SchedulerEnv::new().await;
1701        let builder = VersionControlBuilder::new();
1702        let version_control = Arc::new(builder.build());
1703
1704        // Create context with staging state using the correct pattern from SchedulerEnv
1705        let staging_ctx = {
1706            let manager = RegionManifestManager::new(
1707                version_control.current().version.metadata.clone(),
1708                0,
1709                RegionManifestOptions {
1710                    manifest_dir: "".to_string(),
1711                    object_store: env.access_layer.object_store().clone(),
1712                    compress_type: CompressionType::Uncompressed,
1713                    checkpoint_distance: 10,
1714                    remove_file_options: Default::default(),
1715                    manifest_cache: None,
1716                },
1717                FormatType::PrimaryKey,
1718                &Default::default(),
1719            )
1720            .await
1721            .unwrap();
1722            Arc::new(ManifestContext::new(
1723                manager,
1724                RegionRoleState::Leader(RegionLeaderState::Staging),
1725            ))
1726        };
1727
1728        // Test staging state behavior
1729        assert_eq!(
1730            staging_ctx.current_state(),
1731            RegionRoleState::Leader(RegionLeaderState::Staging)
1732        );
1733
1734        // Test writable context for comparison
1735        let writable_ctx = env
1736            .mock_manifest_context(version_control.current().version.metadata.clone())
1737            .await;
1738
1739        assert_eq!(
1740            writable_ctx.current_state(),
1741            RegionRoleState::Leader(RegionLeaderState::Writable)
1742        );
1743    }
1744
1745    #[tokio::test]
1746    async fn test_staging_state_transitions() {
1747        let builder = VersionControlBuilder::new();
1748        let version_control = Arc::new(builder.build());
1749        let metadata = version_control.current().version.metadata.clone();
1750
1751        // Create MitoRegion for testing state transitions
1752        let temp_dir = create_temp_dir("");
1753        let path_str = temp_dir.path().display().to_string();
1754        let fs_builder = Fs::default().root(&path_str);
1755        let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1756
1757        let index_aux_path = temp_dir.path().join("index_aux");
1758        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1759            .await
1760            .unwrap();
1761        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1762            .await
1763            .unwrap();
1764
1765        let access_layer = Arc::new(AccessLayer::new(
1766            "",
1767            PathType::Bare,
1768            object_store,
1769            puffin_mgr,
1770            intm_mgr,
1771        ));
1772
1773        let manager = RegionManifestManager::new(
1774            metadata.clone(),
1775            0,
1776            RegionManifestOptions {
1777                manifest_dir: "".to_string(),
1778                object_store: access_layer.object_store().clone(),
1779                compress_type: CompressionType::Uncompressed,
1780                checkpoint_distance: 10,
1781                remove_file_options: Default::default(),
1782                manifest_cache: None,
1783            },
1784            FormatType::PrimaryKey,
1785            &Default::default(),
1786        )
1787        .await
1788        .unwrap();
1789
1790        let manifest_ctx = Arc::new(ManifestContext::new(
1791            manager,
1792            RegionRoleState::Leader(RegionLeaderState::Writable),
1793        ));
1794
1795        let region = MitoRegion {
1796            region_id: metadata.region_id,
1797            version_control,
1798            access_layer,
1799            manifest_ctx: manifest_ctx.clone(),
1800            file_purger: crate::test_util::new_noop_file_purger(),
1801            provider: Provider::noop_provider(),
1802            last_flush_millis: Default::default(),
1803            last_compaction_millis: Default::default(),
1804            time_provider: Arc::new(StdTimeProvider),
1805            topic_latest_entry_id: Default::default(),
1806            written_bytes: Arc::new(AtomicU64::new(0)),
1807            stats: ManifestStats::default(),
1808            staging_partition_info: Mutex::new(None),
1809        };
1810
1811        // Test initial state
1812        assert_eq!(
1813            region.state(),
1814            RegionRoleState::Leader(RegionLeaderState::Writable)
1815        );
1816        assert!(!region.is_staging());
1817
1818        // Test transition to staging
1819        let mut manager = manifest_ctx.manifest_manager.write().await;
1820        region.set_staging(&mut manager).await.unwrap();
1821        drop(manager);
1822        assert_eq!(
1823            region.state(),
1824            RegionRoleState::Leader(RegionLeaderState::Staging)
1825        );
1826        assert!(region.is_staging());
1827
1828        // Test transition back to writable
1829        region.exit_staging().unwrap();
1830        assert_eq!(
1831            region.state(),
1832            RegionRoleState::Leader(RegionLeaderState::Writable)
1833        );
1834        assert!(!region.is_staging());
1835
1836        // Test staging directory cleanup: Create dirty staging files before entering staging mode
1837        {
1838            // Create some dummy staging manifest files to simulate interrupted session
1839            let manager = manifest_ctx.manifest_manager.write().await;
1840            let dummy_actions = RegionMetaActionList::new(vec![]);
1841            let dummy_bytes = dummy_actions.encode().unwrap();
1842
1843            // Create dirty staging files with versions 100 and 101
1844            manager.store().save(100, &dummy_bytes, true).await.unwrap();
1845            manager.store().save(101, &dummy_bytes, true).await.unwrap();
1846            drop(manager);
1847
1848            // Verify dirty files exist before entering staging
1849            let manager = manifest_ctx.manifest_manager.read().await;
1850            let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1851            assert_eq!(
1852                dirty_manifests.len(),
1853                2,
1854                "Should have 2 dirty staging files"
1855            );
1856            drop(manager);
1857
1858            // Enter staging mode - this should clean up the dirty files
1859            let mut manager = manifest_ctx.manifest_manager.write().await;
1860            region.set_staging(&mut manager).await.unwrap();
1861            drop(manager);
1862
1863            // Verify dirty files are cleaned up after entering staging
1864            let manager = manifest_ctx.manifest_manager.read().await;
1865            let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1866            assert_eq!(
1867                cleaned_manifests.len(),
1868                0,
1869                "Dirty staging files should be cleaned up"
1870            );
1871            drop(manager);
1872
1873            // Exit staging to restore normal state for remaining tests
1874            region.exit_staging().unwrap();
1875        }
1876
1877        // Test invalid transitions
1878        let mut manager = manifest_ctx.manifest_manager.write().await;
1879        assert!(region.set_staging(&mut manager).await.is_ok()); // Writable -> Staging should work
1880        drop(manager);
1881        let mut manager = manifest_ctx.manifest_manager.write().await;
1882        assert!(region.set_staging(&mut manager).await.is_err()); // Staging -> Staging should fail
1883        drop(manager);
1884        assert!(region.exit_staging().is_ok()); // Staging -> Writable should work
1885        assert!(region.exit_staging().is_err()); // Writable -> Writable should fail
1886    }
1887}