Skip to main content

store_api/
region_engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Region Engine's definition
16
17use std::any::Any;
18use std::collections::HashMap;
19use std::fmt::{Debug, Display};
20use std::sync::{Arc, Mutex};
21
22use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
23use api::region::RegionResponse;
24use async_trait::async_trait;
25use common_error::ext::BoxedError;
26use common_recordbatch::{EmptyRecordBatchStream, QueryMemoryTracker, SendableRecordBatchStream};
27use common_time::Timestamp;
28use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
29use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PhysicalExpr};
30use datatypes::schema::SchemaRef;
31use futures::future::join_all;
32use serde::{Deserialize, Serialize};
33use tokio::sync::Semaphore;
34
35use crate::logstore::entry;
36use crate::metadata::RegionMetadataRef;
37use crate::region_request::{
38    BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
39};
40use crate::storage::{FileId, RegionId, ScanRequest, SequenceNumber};
41
42/// The settable region role state.
43#[derive(Debug, PartialEq, Eq, Clone, Copy)]
44pub enum SettableRegionRoleState {
45    Follower,
46    DowngradingLeader,
47    /// Exit staging mode and return to normal leader state. Only allowed from staging state.
48    Leader,
49    /// Enter staging mode. Region remains writable but disables checkpoint and compaction. Only allowed from normal leader state.
50    StagingLeader,
51}
52
53impl Display for SettableRegionRoleState {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        match self {
56            SettableRegionRoleState::Follower => write!(f, "Follower"),
57            SettableRegionRoleState::DowngradingLeader => write!(f, "Leader(Downgrading)"),
58            SettableRegionRoleState::Leader => write!(f, "Leader"),
59            SettableRegionRoleState::StagingLeader => write!(f, "Leader(Staging)"),
60        }
61    }
62}
63
64impl From<SettableRegionRoleState> for RegionRole {
65    fn from(value: SettableRegionRoleState) -> Self {
66        match value {
67            SettableRegionRoleState::Follower => RegionRole::Follower,
68            SettableRegionRoleState::DowngradingLeader => RegionRole::DowngradingLeader,
69            SettableRegionRoleState::Leader => RegionRole::Leader,
70            SettableRegionRoleState::StagingLeader => RegionRole::StagingLeader,
71        }
72    }
73}
74
75/// The request to set region role state.
76#[derive(Debug, PartialEq, Eq)]
77pub struct SetRegionRoleStateRequest {
78    region_id: RegionId,
79    region_role_state: SettableRegionRoleState,
80}
81
82/// The success response of setting region role state.
83#[derive(Debug, PartialEq, Eq)]
84pub enum SetRegionRoleStateSuccess {
85    File,
86    Mito {
87        last_entry_id: entry::Id,
88    },
89    Metric {
90        last_entry_id: entry::Id,
91        metadata_last_entry_id: entry::Id,
92    },
93}
94
95impl SetRegionRoleStateSuccess {
96    /// Returns a [SetRegionRoleStateSuccess::File].
97    pub fn file() -> Self {
98        Self::File
99    }
100
101    /// Returns a [SetRegionRoleStateSuccess::Mito] with the `last_entry_id`.
102    pub fn mito(last_entry_id: entry::Id) -> Self {
103        SetRegionRoleStateSuccess::Mito { last_entry_id }
104    }
105
106    /// Returns a [SetRegionRoleStateSuccess::Metric] with the `last_entry_id` and `metadata_last_entry_id`.
107    pub fn metric(last_entry_id: entry::Id, metadata_last_entry_id: entry::Id) -> Self {
108        SetRegionRoleStateSuccess::Metric {
109            last_entry_id,
110            metadata_last_entry_id,
111        }
112    }
113}
114
115impl SetRegionRoleStateSuccess {
116    /// Returns the last entry id of the region.
117    pub fn last_entry_id(&self) -> Option<entry::Id> {
118        match self {
119            SetRegionRoleStateSuccess::File => None,
120            SetRegionRoleStateSuccess::Mito { last_entry_id } => Some(*last_entry_id),
121            SetRegionRoleStateSuccess::Metric { last_entry_id, .. } => Some(*last_entry_id),
122        }
123    }
124
125    /// Returns the last entry id of the metadata of the region.
126    pub fn metadata_last_entry_id(&self) -> Option<entry::Id> {
127        match self {
128            SetRegionRoleStateSuccess::File => None,
129            SetRegionRoleStateSuccess::Mito { .. } => None,
130            SetRegionRoleStateSuccess::Metric {
131                metadata_last_entry_id,
132                ..
133            } => Some(*metadata_last_entry_id),
134        }
135    }
136}
137
138/// The response of setting region role state.
139#[derive(Debug)]
140pub enum SetRegionRoleStateResponse {
141    Success(SetRegionRoleStateSuccess),
142    NotFound,
143    InvalidTransition(BoxedError),
144}
145
146impl SetRegionRoleStateResponse {
147    /// Returns a [SetRegionRoleStateResponse::Success] with the `File` success.
148    pub fn success(success: SetRegionRoleStateSuccess) -> Self {
149        Self::Success(success)
150    }
151
152    /// Returns a [SetRegionRoleStateResponse::InvalidTransition] with the error.
153    pub fn invalid_transition(error: BoxedError) -> Self {
154        Self::InvalidTransition(error)
155    }
156
157    /// Returns true if the response is a [SetRegionRoleStateResponse::NotFound].
158    pub fn is_not_found(&self) -> bool {
159        matches!(self, SetRegionRoleStateResponse::NotFound)
160    }
161
162    /// Returns true if the response is a [SetRegionRoleStateResponse::InvalidTransition].
163    pub fn is_invalid_transition(&self) -> bool {
164        matches!(self, SetRegionRoleStateResponse::InvalidTransition(_))
165    }
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
169pub struct GrantedRegion {
170    pub region_id: RegionId,
171    pub region_role: RegionRole,
172    pub extensions: HashMap<String, Vec<u8>>,
173}
174
175impl GrantedRegion {
176    pub fn new(region_id: RegionId, region_role: RegionRole) -> Self {
177        Self {
178            region_id,
179            region_role,
180            extensions: HashMap::new(),
181        }
182    }
183}
184
185impl From<GrantedRegion> for PbGrantedRegion {
186    fn from(value: GrantedRegion) -> Self {
187        PbGrantedRegion {
188            region_id: value.region_id.as_u64(),
189            role: PbRegionRole::from(value.region_role).into(),
190            extensions: value.extensions,
191        }
192    }
193}
194
195impl From<PbGrantedRegion> for GrantedRegion {
196    fn from(value: PbGrantedRegion) -> Self {
197        GrantedRegion {
198            region_id: RegionId::from_u64(value.region_id),
199            region_role: value.role().into(),
200            extensions: value.extensions,
201        }
202    }
203}
204
205/// The role of the region.
206/// TODO(weny): rename it to `RegionRoleState`
207#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
208pub enum RegionRole {
209    // Readonly region(mito2)
210    Follower,
211    // Writable region(mito2), Readonly region(file).
212    Leader,
213    // Leader is in staging mode.
214    //
215    // This is leader-like and writable, but it follows the staging workflow
216    // semantics instead of a normal leader's steady state.
217    StagingLeader,
218    // Leader is downgrading to follower.
219    //
220    // This state is used to prevent new write requests.
221    DowngradingLeader,
222}
223
224impl Display for RegionRole {
225    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226        match self {
227            RegionRole::Follower => write!(f, "Follower"),
228            RegionRole::Leader => write!(f, "Leader"),
229            RegionRole::StagingLeader => write!(f, "Leader(Staging)"),
230            RegionRole::DowngradingLeader => write!(f, "Leader(Downgrading)"),
231        }
232    }
233}
234
235impl RegionRole {
236    pub fn writable(&self) -> bool {
237        matches!(self, RegionRole::Leader | RegionRole::StagingLeader)
238    }
239}
240
241impl From<RegionRole> for PbRegionRole {
242    fn from(value: RegionRole) -> Self {
243        match value {
244            RegionRole::Follower => PbRegionRole::Follower,
245            RegionRole::Leader => PbRegionRole::Leader,
246            RegionRole::StagingLeader => PbRegionRole::StagingLeader,
247            RegionRole::DowngradingLeader => PbRegionRole::DowngradingLeader,
248        }
249    }
250}
251
252impl From<PbRegionRole> for RegionRole {
253    fn from(value: PbRegionRole) -> Self {
254        match value {
255            PbRegionRole::Leader => RegionRole::Leader,
256            PbRegionRole::StagingLeader => RegionRole::StagingLeader,
257            PbRegionRole::Follower => RegionRole::Follower,
258            PbRegionRole::DowngradingLeader => RegionRole::DowngradingLeader,
259        }
260    }
261}
262
263/// Output partition properties of the [RegionScanner].
264#[derive(Debug)]
265pub enum ScannerPartitioning {
266    /// Unknown partitioning scheme with a known number of partitions
267    Unknown(usize),
268}
269
270impl ScannerPartitioning {
271    /// Returns the number of partitions.
272    pub fn num_partitions(&self) -> usize {
273        match self {
274            ScannerPartitioning::Unknown(num_partitions) => *num_partitions,
275        }
276    }
277}
278
279/// Represents one data range within a partition
280#[derive(Debug, Clone, Copy, PartialEq, Eq)]
281pub struct PartitionRange {
282    /// Start time of time index column. Inclusive.
283    pub start: Timestamp,
284    /// End time of time index column. Exclusive.
285    pub end: Timestamp,
286    /// Number of rows in this range. Is used to balance ranges between partitions.
287    pub num_rows: usize,
288    /// Identifier to this range. Assigned by storage engine.
289    pub identifier: usize,
290}
291
292/// Properties of the [RegionScanner].
293#[derive(Debug, Default)]
294pub struct ScannerProperties {
295    /// A 2-dim partition ranges.
296    ///
297    /// The first dim vector's length represents the output partition number. The second
298    /// dim is ranges within one partition.
299    pub partitions: Vec<Vec<PartitionRange>>,
300
301    /// Whether scanner is in append-only mode.
302    append_mode: bool,
303
304    /// Total rows that **may** return by scanner. This field is only read iff
305    /// [ScannerProperties::append_mode] is true.
306    total_rows: usize,
307
308    /// Whether to yield an empty batch to distinguish partition ranges.
309    pub distinguish_partition_range: bool,
310
311    /// The target partitions of the scanner. 0 indicates using the number of partitions as target partitions.
312    target_partitions: usize,
313
314    /// Whether the scanner is scanning a logical region.
315    logical_region: bool,
316
317    /// Region id that should receive query-load metrics for this scanner.
318    query_load_region_id: Option<RegionId>,
319}
320
321impl ScannerProperties {
322    /// Sets append mode for scanner.
323    pub fn with_append_mode(mut self, append_mode: bool) -> Self {
324        self.append_mode = append_mode;
325        self
326    }
327
328    /// Sets total rows for scanner.
329    pub fn with_total_rows(mut self, total_rows: usize) -> Self {
330        self.total_rows = total_rows;
331        self
332    }
333
334    /// Creates a new [`ScannerProperties`] with the given partitioning.
335    pub fn new(partitions: Vec<Vec<PartitionRange>>, append_mode: bool, total_rows: usize) -> Self {
336        Self {
337            partitions,
338            append_mode,
339            total_rows,
340            distinguish_partition_range: false,
341            target_partitions: 0,
342            logical_region: false,
343            query_load_region_id: None,
344        }
345    }
346
347    /// Updates the properties with the given [PrepareRequest].
348    pub fn prepare(&mut self, request: PrepareRequest) {
349        if let Some(ranges) = request.ranges {
350            self.partitions = ranges;
351        }
352        if let Some(distinguish_partition_range) = request.distinguish_partition_range {
353            self.distinguish_partition_range = distinguish_partition_range;
354        }
355        if let Some(target_partitions) = request.target_partitions {
356            self.target_partitions = target_partitions;
357        }
358    }
359
360    /// Returns the number of actual partitions.
361    pub fn num_partitions(&self) -> usize {
362        self.partitions.len()
363    }
364
365    pub fn append_mode(&self) -> bool {
366        self.append_mode
367    }
368
369    pub fn total_rows(&self) -> usize {
370        self.total_rows
371    }
372
373    /// Returns whether the scanner is scanning a logical region.
374    pub fn is_logical_region(&self) -> bool {
375        self.logical_region
376    }
377
378    /// Returns the target partitions of the scanner. If it is not set, returns the number of partitions.
379    pub fn target_partitions(&self) -> usize {
380        if self.target_partitions == 0 {
381            self.num_partitions()
382        } else {
383            self.target_partitions
384        }
385    }
386
387    /// Sets whether the scanner is reading a logical region.
388    pub fn set_logical_region(&mut self, logical_region: bool) {
389        self.logical_region = logical_region;
390    }
391
392    /// Returns the region id that should receive query-load metrics.
393    pub fn query_load_region_id(&self) -> Option<RegionId> {
394        self.query_load_region_id
395    }
396
397    /// Sets the region id that should receive query-load metrics.
398    pub fn set_query_load_region_id(&mut self, region_id: RegionId) {
399        self.query_load_region_id = Some(region_id);
400    }
401}
402
403/// Request to override the scanner properties.
404#[derive(Default)]
405pub struct PrepareRequest {
406    /// Assigned partition ranges.
407    pub ranges: Option<Vec<Vec<PartitionRange>>>,
408    /// Distringuishes partition range by empty batches.
409    pub distinguish_partition_range: Option<bool>,
410    /// The expected number of target partitions.
411    pub target_partitions: Option<usize>,
412}
413
414impl PrepareRequest {
415    /// Sets the ranges.
416    pub fn with_ranges(mut self, ranges: Vec<Vec<PartitionRange>>) -> Self {
417        self.ranges = Some(ranges);
418        self
419    }
420
421    /// Sets the distinguish partition range flag.
422    pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self {
423        self.distinguish_partition_range = Some(distinguish_partition_range);
424        self
425    }
426
427    /// Sets the target partitions.
428    pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
429        self.target_partitions = Some(target_partitions);
430        self
431    }
432}
433
434/// Necessary context of the query for the scanner.
435#[derive(Clone, Default)]
436pub struct QueryScanContext {
437    /// Whether the query is EXPLAIN ANALYZE VERBOSE.
438    pub explain_verbose: bool,
439}
440
441/// A scanner that provides a way to scan the region concurrently.
442///
443/// The scanner splits the region into partitions so that each partition can be scanned concurrently.
444/// You can use this trait to implement an [`ExecutionPlan`](datafusion_physical_plan::ExecutionPlan).
445pub trait RegionScanner: Debug + DisplayAs + Send {
446    fn name(&self) -> &str;
447
448    /// Returns the properties of the scanner.
449    fn properties(&self) -> &ScannerProperties;
450
451    /// Returns the schema of the record batches.
452    fn schema(&self) -> SchemaRef;
453
454    /// Returns the metadata of the region.
455    fn metadata(&self) -> RegionMetadataRef;
456
457    /// Prepares the scanner with the given partition ranges.
458    ///
459    /// This method is for the planner to adjust the scanner's behavior based on the partition ranges.
460    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>;
461
462    /// Scans the partition and returns a stream of record batches.
463    ///
464    /// # Panics
465    /// Panics if the `partition` is out of bound.
466    fn scan_partition(
467        &self,
468        ctx: &QueryScanContext,
469        metrics_set: &ExecutionPlanMetricsSet,
470        partition: usize,
471    ) -> Result<SendableRecordBatchStream, BoxedError>;
472
473    /// Check if there is any predicate exclude region partition exprs that may be executed in this scanner.
474    fn has_predicate_without_region(&self) -> bool;
475
476    /// Add the given dynamic filter expressions to the predicate of the scanner.
477    /// Returns a vector of booleans indicating which filter expressions were applied.
478    /// true indicates the filter expression was applied(will be use by scanner to prune by stat for row group),
479    /// false otherwise.
480    fn add_dyn_filter_to_predicate(
481        &mut self,
482        filter_exprs: Vec<Arc<dyn PhysicalExpr>>,
483    ) -> Vec<bool>;
484
485    /// Sets whether the scanner is reading a logical region.
486    fn set_logical_region(&mut self, logical_region: bool);
487
488    /// Sets the region id that should receive query-load metrics.
489    fn set_query_load_region_id(&mut self, region_id: RegionId);
490
491    fn snapshot_sequence(&self) -> Option<SequenceNumber> {
492        None
493    }
494}
495
496pub type RegionScannerRef = Box<dyn RegionScanner>;
497
498pub type BatchResponses = Vec<(RegionId, Result<RegionResponse, BoxedError>)>;
499
500/// Represents the statistics of a region.
501#[derive(Debug, Deserialize, Serialize, Default)]
502pub struct RegionStatistic {
503    /// The number of rows stored in SST files owned by this region plus rows in memtables.
504    ///
505    /// Rows from SST files referenced from other regions, for example after repartition,
506    /// are not counted to avoid table-level double counting when summing region statistics.
507    #[serde(default)]
508    pub num_rows: u64,
509    /// The size of memtable in bytes.
510    pub memtable_size: u64,
511    /// The size of WAL in bytes.
512    pub wal_size: u64,
513    /// The size of manifest in bytes.
514    pub manifest_size: u64,
515    /// The size of SST data files owned by this region in bytes.
516    ///
517    /// SST files referenced from other regions, for example after repartition, are not counted.
518    pub sst_size: u64,
519    /// The number of SST files owned by this region.
520    ///
521    /// SST files referenced from other regions, for example after repartition, are not counted.
522    pub sst_num: u64,
523    /// The size of SST index files owned by this region in bytes.
524    ///
525    /// SST index files referenced from other regions, for example after repartition, are not counted.
526    #[serde(default)]
527    pub index_size: u64,
528    /// The details of the region.
529    #[serde(default)]
530    pub manifest: RegionManifestInfo,
531    #[serde(default)]
532    /// The total bytes written of the region since region opened.
533    pub written_bytes: u64,
534    /// The latest entry id of the region's remote WAL since last flush.
535    /// For metric engine, there're two latest entry ids, one for data and one for metadata.
536    /// TODO(weny): remove this two fields and use single instead.
537    #[serde(default)]
538    pub data_topic_latest_entry_id: u64,
539    #[serde(default)]
540    pub metadata_topic_latest_entry_id: u64,
541}
542
543/// The manifest info of a region.
544#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
545pub enum RegionManifestInfo {
546    Mito {
547        manifest_version: u64,
548        flushed_entry_id: u64,
549        /// Number of files removed in the manifest's `removed_files` field.
550        file_removed_cnt: u64,
551    },
552    Metric {
553        data_manifest_version: u64,
554        data_flushed_entry_id: u64,
555        metadata_manifest_version: u64,
556        metadata_flushed_entry_id: u64,
557    },
558}
559
560impl RegionManifestInfo {
561    /// Creates a new [RegionManifestInfo] for mito2 engine.
562    pub fn mito(manifest_version: u64, flushed_entry_id: u64, file_removal_rate: u64) -> Self {
563        Self::Mito {
564            manifest_version,
565            flushed_entry_id,
566            file_removed_cnt: file_removal_rate,
567        }
568    }
569
570    /// Creates a new [RegionManifestInfo] for metric engine.
571    pub fn metric(
572        data_manifest_version: u64,
573        data_flushed_entry_id: u64,
574        metadata_manifest_version: u64,
575        metadata_flushed_entry_id: u64,
576    ) -> Self {
577        Self::Metric {
578            data_manifest_version,
579            data_flushed_entry_id,
580            metadata_manifest_version,
581            metadata_flushed_entry_id,
582        }
583    }
584
585    /// Returns true if the region is a mito2 region.
586    pub fn is_mito(&self) -> bool {
587        matches!(self, RegionManifestInfo::Mito { .. })
588    }
589
590    /// Returns true if the region is a metric region.
591    pub fn is_metric(&self) -> bool {
592        matches!(self, RegionManifestInfo::Metric { .. })
593    }
594
595    /// Returns the flushed entry id of the data region.
596    pub fn data_flushed_entry_id(&self) -> u64 {
597        match self {
598            RegionManifestInfo::Mito {
599                flushed_entry_id, ..
600            } => *flushed_entry_id,
601            RegionManifestInfo::Metric {
602                data_flushed_entry_id,
603                ..
604            } => *data_flushed_entry_id,
605        }
606    }
607
608    /// Returns the manifest version of the data region.
609    pub fn data_manifest_version(&self) -> u64 {
610        match self {
611            RegionManifestInfo::Mito {
612                manifest_version, ..
613            } => *manifest_version,
614            RegionManifestInfo::Metric {
615                data_manifest_version,
616                ..
617            } => *data_manifest_version,
618        }
619    }
620
621    /// Returns the manifest version of the metadata region.
622    pub fn metadata_manifest_version(&self) -> Option<u64> {
623        match self {
624            RegionManifestInfo::Mito { .. } => None,
625            RegionManifestInfo::Metric {
626                metadata_manifest_version,
627                ..
628            } => Some(*metadata_manifest_version),
629        }
630    }
631
632    /// Returns the flushed entry id of the metadata region.
633    pub fn metadata_flushed_entry_id(&self) -> Option<u64> {
634        match self {
635            RegionManifestInfo::Mito { .. } => None,
636            RegionManifestInfo::Metric {
637                metadata_flushed_entry_id,
638                ..
639            } => Some(*metadata_flushed_entry_id),
640        }
641    }
642
643    /// Encodes a list of ([RegionId], [RegionManifestInfo]) to a byte array.
644    pub fn encode_list(manifest_infos: &[(RegionId, Self)]) -> serde_json::Result<Vec<u8>> {
645        serde_json::to_vec(manifest_infos)
646    }
647
648    /// Decodes a list of ([RegionId], [RegionManifestInfo]) from a byte array.
649    pub fn decode_list(value: &[u8]) -> serde_json::Result<Vec<(RegionId, Self)>> {
650        serde_json::from_slice(value)
651    }
652}
653
654impl Default for RegionManifestInfo {
655    fn default() -> Self {
656        Self::Mito {
657            manifest_version: 0,
658            flushed_entry_id: 0,
659            file_removed_cnt: 0,
660        }
661    }
662}
663
664impl RegionStatistic {
665    /// Deserializes the region statistic to a byte array.
666    ///
667    /// Returns None if the deserialization fails.
668    pub fn deserialize_from_slice(value: &[u8]) -> Option<RegionStatistic> {
669        serde_json::from_slice(value).ok()
670    }
671
672    /// Serializes the region statistic to a byte array.
673    ///
674    /// Returns None if the serialization fails.
675    pub fn serialize_to_vec(&self) -> Option<Vec<u8>> {
676        serde_json::to_vec(self).ok()
677    }
678}
679
680impl RegionStatistic {
681    /// Returns the estimated disk size of the region.
682    pub fn estimated_disk_size(&self) -> u64 {
683        self.wal_size + self.sst_size + self.manifest_size + self.index_size
684    }
685}
686
687/// Request to sync the region from a manifest or a region.
688#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
689pub enum SyncRegionFromRequest {
690    /// Syncs the region using manifest information.
691    /// Used in leader-follower manifest sync scenarios.
692    FromManifest(RegionManifestInfo),
693    /// Syncs the region from another region.
694    ///
695    /// Used by the metric engine to sync logical regions from a source physical region
696    /// to a target physical region. This copies metadata region SST files and transforms
697    /// logical region entries to use the target's region number.
698    FromRegion {
699        /// The [`RegionId`] of the source region.
700        source_region_id: RegionId,
701        /// The parallelism of the sync operation.
702        parallelism: usize,
703    },
704}
705
706impl From<RegionManifestInfo> for SyncRegionFromRequest {
707    fn from(manifest_info: RegionManifestInfo) -> Self {
708        SyncRegionFromRequest::FromManifest(manifest_info)
709    }
710}
711
712impl SyncRegionFromRequest {
713    /// Creates a new request from a manifest info.
714    pub fn from_manifest(manifest_info: RegionManifestInfo) -> Self {
715        SyncRegionFromRequest::FromManifest(manifest_info)
716    }
717
718    /// Creates a new request from a region.
719    pub fn from_region(source_region_id: RegionId, parallelism: usize) -> Self {
720        SyncRegionFromRequest::FromRegion {
721            source_region_id,
722            parallelism,
723        }
724    }
725
726    /// Returns true if the request is from a manifest.
727    pub fn is_from_manifest(&self) -> bool {
728        matches!(self, SyncRegionFromRequest::FromManifest { .. })
729    }
730
731    /// Converts the request to a region manifest info.
732    ///
733    /// Returns None if the request is not from a manifest.
734    pub fn into_region_manifest_info(self) -> Option<RegionManifestInfo> {
735        match self {
736            SyncRegionFromRequest::FromManifest(manifest_info) => Some(manifest_info),
737            SyncRegionFromRequest::FromRegion { .. } => None,
738        }
739    }
740}
741
742/// The response of syncing the region.
743#[derive(Debug)]
744pub enum SyncRegionFromResponse {
745    NotSupported,
746    Mito {
747        /// Indicates if the data region was synced.
748        synced: bool,
749    },
750    Metric {
751        /// Indicates if the metadata region was synced.
752        metadata_synced: bool,
753        /// Indicates if the data region was synced.
754        data_synced: bool,
755        /// The logical regions that were newly opened during the sync operation.
756        /// This only occurs after the metadata region has been successfully synced.
757        new_opened_logical_region_ids: Vec<RegionId>,
758    },
759}
760
761impl SyncRegionFromResponse {
762    /// Returns true if data region is synced.
763    pub fn is_data_synced(&self) -> bool {
764        match self {
765            SyncRegionFromResponse::NotSupported => false,
766            SyncRegionFromResponse::Mito { synced } => *synced,
767            SyncRegionFromResponse::Metric { data_synced, .. } => *data_synced,
768        }
769    }
770
771    /// Returns true if the engine is a mito2 engine.
772    pub fn is_mito(&self) -> bool {
773        matches!(self, SyncRegionFromResponse::Mito { .. })
774    }
775
776    /// Returns true if the engine is a metric engine.
777    pub fn is_metric(&self) -> bool {
778        matches!(self, SyncRegionFromResponse::Metric { .. })
779    }
780
781    /// Returns the new opened logical region ids.
782    pub fn new_opened_logical_region_ids(self) -> Option<Vec<RegionId>> {
783        match self {
784            SyncRegionFromResponse::Metric {
785                new_opened_logical_region_ids,
786                ..
787            } => Some(new_opened_logical_region_ids),
788            _ => None,
789        }
790    }
791}
792
793/// Request to remap manifests from old regions to new regions.
794#[derive(Debug, Clone)]
795pub struct RemapManifestsRequest {
796    /// The [`RegionId`] of a staging region used to obtain table directory and storage configuration for the remap operation.
797    pub region_id: RegionId,
798    /// Regions to remap manifests from.
799    pub input_regions: Vec<RegionId>,
800    /// For each old region, which new regions should receive its files
801    pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
802    /// New partition expressions for the new regions.
803    pub new_partition_exprs: HashMap<RegionId, String>,
804}
805
806/// Response to remap manifests from old regions to new regions.
807#[derive(Debug, Clone)]
808pub struct RemapManifestsResponse {
809    /// Maps region id to its staging manifest path.
810    ///
811    /// These paths are relative paths within the central region's staging blob storage,
812    /// and should be passed to [`ApplyStagingManifestRequest`](RegionRequest::ApplyStagingManifest) to finalize the repartition.
813    pub manifest_paths: HashMap<RegionId, String>,
814}
815
816/// Request to copy files from a source region to a target region.
817#[derive(Debug, Clone)]
818pub struct MitoCopyRegionFromRequest {
819    /// The [`RegionId`] of the source region.
820    pub source_region_id: RegionId,
821    /// The parallelism of the copy operation.
822    pub parallelism: usize,
823}
824
825#[derive(Debug, Clone)]
826pub struct MitoCopyRegionFromResponse {
827    /// The file ids that were copied from the source region to the target region.
828    pub copied_file_ids: Vec<FileId>,
829}
830
831#[async_trait]
832pub trait RegionEngine: Send + Sync {
833    /// Name of this engine
834    fn name(&self) -> &str;
835
836    /// Handles batch open region requests.
837    async fn handle_batch_open_requests(
838        &self,
839        parallelism: usize,
840        requests: Vec<(RegionId, RegionOpenRequest)>,
841    ) -> Result<BatchResponses, BoxedError> {
842        let semaphore = Arc::new(Semaphore::new(parallelism));
843        let mut tasks = Vec::with_capacity(requests.len());
844
845        for (region_id, request) in requests {
846            let semaphore_moved = semaphore.clone();
847
848            tasks.push(async move {
849                // Safety: semaphore must exist
850                let _permit = semaphore_moved.acquire().await.unwrap();
851                let result = self
852                    .handle_request(region_id, RegionRequest::Open(request))
853                    .await;
854                (region_id, result)
855            });
856        }
857
858        Ok(join_all(tasks).await)
859    }
860
861    async fn handle_batch_catchup_requests(
862        &self,
863        parallelism: usize,
864        requests: Vec<(RegionId, RegionCatchupRequest)>,
865    ) -> Result<BatchResponses, BoxedError> {
866        let semaphore = Arc::new(Semaphore::new(parallelism));
867        let mut tasks = Vec::with_capacity(requests.len());
868
869        for (region_id, request) in requests {
870            let semaphore_moved = semaphore.clone();
871
872            tasks.push(async move {
873                // Safety: semaphore must exist
874                let _permit = semaphore_moved.acquire().await.unwrap();
875                let result = self
876                    .handle_request(region_id, RegionRequest::Catchup(request))
877                    .await;
878                (region_id, result)
879            });
880        }
881
882        Ok(join_all(tasks).await)
883    }
884
885    async fn handle_batch_ddl_requests(
886        &self,
887        request: BatchRegionDdlRequest,
888    ) -> Result<RegionResponse, BoxedError> {
889        let requests = request.into_region_requests();
890
891        let mut affected_rows = 0;
892        let mut extensions = HashMap::new();
893
894        for (region_id, request) in requests {
895            let result = self.handle_request(region_id, request).await?;
896            affected_rows += result.affected_rows;
897            extensions.extend(result.extensions);
898        }
899
900        Ok(RegionResponse {
901            affected_rows,
902            extensions,
903            metadata: Vec::new(),
904        })
905    }
906
907    /// Handles non-query request to the region. Returns the count of affected rows.
908    async fn handle_request(
909        &self,
910        region_id: RegionId,
911        request: RegionRequest,
912    ) -> Result<RegionResponse, BoxedError>;
913
914    /// Returns the committed sequence (sequence of latest written data).
915    async fn get_committed_sequence(
916        &self,
917        region_id: RegionId,
918    ) -> Result<SequenceNumber, BoxedError>;
919
920    /// Handles query and return a scanner that can be used to scan the region concurrently.
921    async fn handle_query(
922        &self,
923        region_id: RegionId,
924        request: ScanRequest,
925    ) -> Result<RegionScannerRef, BoxedError>;
926
927    /// Returns the query memory tracker for scan execution.
928    fn query_memory_tracker(&self) -> Option<QueryMemoryTracker> {
929        None
930    }
931
932    /// Retrieves region's metadata.
933    async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;
934
935    /// Retrieves region's statistic.
936    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic>;
937
938    /// Stops the engine
939    async fn stop(&self) -> Result<(), BoxedError>;
940
941    /// Sets [RegionRole] for a region.
942    ///
943    /// The engine checks whether the region is writable before writing to the region. Setting
944    /// the region as readonly doesn't guarantee that write operations in progress will not
945    /// take effect.
946    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError>;
947
948    /// Syncs the region manifest to the given manifest version.
949    async fn sync_region(
950        &self,
951        region_id: RegionId,
952        request: SyncRegionFromRequest,
953    ) -> Result<SyncRegionFromResponse, BoxedError>;
954
955    /// Remaps manifests from old regions to new regions.
956    async fn remap_manifests(
957        &self,
958        request: RemapManifestsRequest,
959    ) -> Result<RemapManifestsResponse, BoxedError>;
960
961    /// Sets region role state gracefully.
962    ///
963    /// After the call returns, the engine ensures no more write operations will succeed in the region.
964    async fn set_region_role_state_gracefully(
965        &self,
966        region_id: RegionId,
967        region_role_state: SettableRegionRoleState,
968    ) -> Result<SetRegionRoleStateResponse, BoxedError>;
969
970    /// Indicates region role.
971    ///
972    /// Returns the `None` if the region is not found.
973    fn role(&self, region_id: RegionId) -> Option<RegionRole>;
974
975    fn as_any(&self) -> &dyn Any;
976}
977
978pub type RegionEngineRef = Arc<dyn RegionEngine>;
979
980/// A [RegionScanner] that only scans a single partition.
981pub struct SinglePartitionScanner {
982    stream: Mutex<Option<SendableRecordBatchStream>>,
983    schema: SchemaRef,
984    properties: ScannerProperties,
985    metadata: RegionMetadataRef,
986    snapshot_sequence: Option<SequenceNumber>,
987}
988
989impl SinglePartitionScanner {
990    /// Creates a new [SinglePartitionScanner] with the given stream and metadata.
991    pub fn new(
992        stream: SendableRecordBatchStream,
993        append_mode: bool,
994        metadata: RegionMetadataRef,
995        snapshot_sequence: Option<SequenceNumber>,
996    ) -> Self {
997        let schema = stream.schema();
998        Self {
999            stream: Mutex::new(Some(stream)),
1000            schema,
1001            properties: ScannerProperties::default().with_append_mode(append_mode),
1002            metadata,
1003            snapshot_sequence,
1004        }
1005    }
1006}
1007
1008impl Debug for SinglePartitionScanner {
1009    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1010        write!(f, "SinglePartitionScanner: <SendableRecordBatchStream>")
1011    }
1012}
1013
1014impl RegionScanner for SinglePartitionScanner {
1015    fn name(&self) -> &str {
1016        "SinglePartition"
1017    }
1018
1019    fn properties(&self) -> &ScannerProperties {
1020        &self.properties
1021    }
1022
1023    fn schema(&self) -> SchemaRef {
1024        self.schema.clone()
1025    }
1026
1027    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
1028        self.properties.prepare(request);
1029        Ok(())
1030    }
1031
1032    fn scan_partition(
1033        &self,
1034        _ctx: &QueryScanContext,
1035        _metrics_set: &ExecutionPlanMetricsSet,
1036        _partition: usize,
1037    ) -> Result<SendableRecordBatchStream, BoxedError> {
1038        let mut stream = self.stream.lock().unwrap();
1039        let result = stream
1040            .take()
1041            .or_else(|| Some(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))));
1042        Ok(result.unwrap())
1043    }
1044
1045    fn has_predicate_without_region(&self) -> bool {
1046        false
1047    }
1048
1049    fn add_dyn_filter_to_predicate(
1050        &mut self,
1051        filter_exprs: Vec<Arc<dyn datafusion_physical_plan::PhysicalExpr>>,
1052    ) -> Vec<bool> {
1053        vec![false; filter_exprs.len()]
1054    }
1055
1056    fn metadata(&self) -> RegionMetadataRef {
1057        self.metadata.clone()
1058    }
1059
1060    fn set_logical_region(&mut self, logical_region: bool) {
1061        self.properties.set_logical_region(logical_region);
1062    }
1063
1064    fn set_query_load_region_id(&mut self, region_id: RegionId) {
1065        self.properties.set_query_load_region_id(region_id);
1066    }
1067
1068    fn snapshot_sequence(&self) -> Option<SequenceNumber> {
1069        self.snapshot_sequence
1070    }
1071}
1072
1073impl DisplayAs for SinglePartitionScanner {
1074    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1075        write!(f, "{:?}", self)
1076    }
1077}