store_api/
region_engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Region Engine's definition
16
17use std::any::Any;
18use std::collections::HashMap;
19use std::fmt::{Debug, Display};
20use std::sync::{Arc, Mutex};
21
22use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
23use api::region::RegionResponse;
24use async_trait::async_trait;
25use common_error::ext::BoxedError;
26use common_recordbatch::{EmptyRecordBatchStream, QueryMemoryTracker, SendableRecordBatchStream};
27use common_time::Timestamp;
28use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
29use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PhysicalExpr};
30use datatypes::schema::SchemaRef;
31use futures::future::join_all;
32use serde::{Deserialize, Serialize};
33use tokio::sync::Semaphore;
34
35use crate::logstore::entry;
36use crate::metadata::RegionMetadataRef;
37use crate::region_request::{
38    BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
39};
40use crate::storage::{FileId, RegionId, ScanRequest, SequenceNumber};
41
42/// The settable region role state.
43#[derive(Debug, PartialEq, Eq, Clone, Copy)]
44pub enum SettableRegionRoleState {
45    Follower,
46    DowngradingLeader,
47    /// Exit staging mode and return to normal leader state. Only allowed from staging state.
48    Leader,
49    /// Enter staging mode. Region remains writable but disables checkpoint and compaction. Only allowed from normal leader state.
50    StagingLeader,
51}
52
53impl Display for SettableRegionRoleState {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        match self {
56            SettableRegionRoleState::Follower => write!(f, "Follower"),
57            SettableRegionRoleState::DowngradingLeader => write!(f, "Leader(Downgrading)"),
58            SettableRegionRoleState::Leader => write!(f, "Leader"),
59            SettableRegionRoleState::StagingLeader => write!(f, "Leader(Staging)"),
60        }
61    }
62}
63
64impl From<SettableRegionRoleState> for RegionRole {
65    fn from(value: SettableRegionRoleState) -> Self {
66        match value {
67            SettableRegionRoleState::Follower => RegionRole::Follower,
68            SettableRegionRoleState::DowngradingLeader => RegionRole::DowngradingLeader,
69            SettableRegionRoleState::Leader => RegionRole::Leader,
70            SettableRegionRoleState::StagingLeader => RegionRole::Leader, // Still a leader role
71        }
72    }
73}
74
75/// The request to set region role state.
76#[derive(Debug, PartialEq, Eq)]
77pub struct SetRegionRoleStateRequest {
78    region_id: RegionId,
79    region_role_state: SettableRegionRoleState,
80}
81
82/// The success response of setting region role state.
83#[derive(Debug, PartialEq, Eq)]
84pub enum SetRegionRoleStateSuccess {
85    File,
86    Mito {
87        last_entry_id: entry::Id,
88    },
89    Metric {
90        last_entry_id: entry::Id,
91        metadata_last_entry_id: entry::Id,
92    },
93}
94
95impl SetRegionRoleStateSuccess {
96    /// Returns a [SetRegionRoleStateSuccess::File].
97    pub fn file() -> Self {
98        Self::File
99    }
100
101    /// Returns a [SetRegionRoleStateSuccess::Mito] with the `last_entry_id`.
102    pub fn mito(last_entry_id: entry::Id) -> Self {
103        SetRegionRoleStateSuccess::Mito { last_entry_id }
104    }
105
106    /// Returns a [SetRegionRoleStateSuccess::Metric] with the `last_entry_id` and `metadata_last_entry_id`.
107    pub fn metric(last_entry_id: entry::Id, metadata_last_entry_id: entry::Id) -> Self {
108        SetRegionRoleStateSuccess::Metric {
109            last_entry_id,
110            metadata_last_entry_id,
111        }
112    }
113}
114
115impl SetRegionRoleStateSuccess {
116    /// Returns the last entry id of the region.
117    pub fn last_entry_id(&self) -> Option<entry::Id> {
118        match self {
119            SetRegionRoleStateSuccess::File => None,
120            SetRegionRoleStateSuccess::Mito { last_entry_id } => Some(*last_entry_id),
121            SetRegionRoleStateSuccess::Metric { last_entry_id, .. } => Some(*last_entry_id),
122        }
123    }
124
125    /// Returns the last entry id of the metadata of the region.
126    pub fn metadata_last_entry_id(&self) -> Option<entry::Id> {
127        match self {
128            SetRegionRoleStateSuccess::File => None,
129            SetRegionRoleStateSuccess::Mito { .. } => None,
130            SetRegionRoleStateSuccess::Metric {
131                metadata_last_entry_id,
132                ..
133            } => Some(*metadata_last_entry_id),
134        }
135    }
136}
137
138/// The response of setting region role state.
139#[derive(Debug)]
140pub enum SetRegionRoleStateResponse {
141    Success(SetRegionRoleStateSuccess),
142    NotFound,
143    InvalidTransition(BoxedError),
144}
145
146impl SetRegionRoleStateResponse {
147    /// Returns a [SetRegionRoleStateResponse::Success] with the `File` success.
148    pub fn success(success: SetRegionRoleStateSuccess) -> Self {
149        Self::Success(success)
150    }
151
152    /// Returns a [SetRegionRoleStateResponse::InvalidTransition] with the error.
153    pub fn invalid_transition(error: BoxedError) -> Self {
154        Self::InvalidTransition(error)
155    }
156
157    /// Returns true if the response is a [SetRegionRoleStateResponse::NotFound].
158    pub fn is_not_found(&self) -> bool {
159        matches!(self, SetRegionRoleStateResponse::NotFound)
160    }
161
162    /// Returns true if the response is a [SetRegionRoleStateResponse::InvalidTransition].
163    pub fn is_invalid_transition(&self) -> bool {
164        matches!(self, SetRegionRoleStateResponse::InvalidTransition(_))
165    }
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
169pub struct GrantedRegion {
170    pub region_id: RegionId,
171    pub region_role: RegionRole,
172    pub extensions: HashMap<String, Vec<u8>>,
173}
174
175impl GrantedRegion {
176    pub fn new(region_id: RegionId, region_role: RegionRole) -> Self {
177        Self {
178            region_id,
179            region_role,
180            extensions: HashMap::new(),
181        }
182    }
183}
184
185impl From<GrantedRegion> for PbGrantedRegion {
186    fn from(value: GrantedRegion) -> Self {
187        PbGrantedRegion {
188            region_id: value.region_id.as_u64(),
189            role: PbRegionRole::from(value.region_role).into(),
190            extensions: value.extensions,
191        }
192    }
193}
194
195impl From<PbGrantedRegion> for GrantedRegion {
196    fn from(value: PbGrantedRegion) -> Self {
197        GrantedRegion {
198            region_id: RegionId::from_u64(value.region_id),
199            region_role: value.role().into(),
200            extensions: value.extensions,
201        }
202    }
203}
204
205/// The role of the region.
206/// TODO(weny): rename it to `RegionRoleState`
207#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
208pub enum RegionRole {
209    // Readonly region(mito2)
210    Follower,
211    // Writable region(mito2), Readonly region(file).
212    Leader,
213    // Leader is downgrading to follower.
214    //
215    // This state is used to prevent new write requests.
216    DowngradingLeader,
217}
218
219impl Display for RegionRole {
220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221        match self {
222            RegionRole::Follower => write!(f, "Follower"),
223            RegionRole::Leader => write!(f, "Leader"),
224            RegionRole::DowngradingLeader => write!(f, "Leader(Downgrading)"),
225        }
226    }
227}
228
229impl RegionRole {
230    pub fn writable(&self) -> bool {
231        matches!(self, RegionRole::Leader)
232    }
233}
234
235impl From<RegionRole> for PbRegionRole {
236    fn from(value: RegionRole) -> Self {
237        match value {
238            RegionRole::Follower => PbRegionRole::Follower,
239            RegionRole::Leader => PbRegionRole::Leader,
240            RegionRole::DowngradingLeader => PbRegionRole::DowngradingLeader,
241        }
242    }
243}
244
245impl From<PbRegionRole> for RegionRole {
246    fn from(value: PbRegionRole) -> Self {
247        match value {
248            PbRegionRole::Leader => RegionRole::Leader,
249            PbRegionRole::Follower => RegionRole::Follower,
250            PbRegionRole::DowngradingLeader => RegionRole::DowngradingLeader,
251        }
252    }
253}
254
255/// Output partition properties of the [RegionScanner].
256#[derive(Debug)]
257pub enum ScannerPartitioning {
258    /// Unknown partitioning scheme with a known number of partitions
259    Unknown(usize),
260}
261
262impl ScannerPartitioning {
263    /// Returns the number of partitions.
264    pub fn num_partitions(&self) -> usize {
265        match self {
266            ScannerPartitioning::Unknown(num_partitions) => *num_partitions,
267        }
268    }
269}
270
271/// Represents one data range within a partition
272#[derive(Debug, Clone, Copy, PartialEq, Eq)]
273pub struct PartitionRange {
274    /// Start time of time index column. Inclusive.
275    pub start: Timestamp,
276    /// End time of time index column. Exclusive.
277    pub end: Timestamp,
278    /// Number of rows in this range. Is used to balance ranges between partitions.
279    pub num_rows: usize,
280    /// Identifier to this range. Assigned by storage engine.
281    pub identifier: usize,
282}
283
284/// Properties of the [RegionScanner].
285#[derive(Debug, Default)]
286pub struct ScannerProperties {
287    /// A 2-dim partition ranges.
288    ///
289    /// The first dim vector's length represents the output partition number. The second
290    /// dim is ranges within one partition.
291    pub partitions: Vec<Vec<PartitionRange>>,
292
293    /// Whether scanner is in append-only mode.
294    append_mode: bool,
295
296    /// Total rows that **may** return by scanner. This field is only read iff
297    /// [ScannerProperties::append_mode] is true.
298    total_rows: usize,
299
300    /// Whether to yield an empty batch to distinguish partition ranges.
301    pub distinguish_partition_range: bool,
302
303    /// The target partitions of the scanner. 0 indicates using the number of partitions as target partitions.
304    target_partitions: usize,
305
306    /// Whether the scanner is scanning a logical region.
307    logical_region: bool,
308}
309
310impl ScannerProperties {
311    /// Sets append mode for scanner.
312    pub fn with_append_mode(mut self, append_mode: bool) -> Self {
313        self.append_mode = append_mode;
314        self
315    }
316
317    /// Sets total rows for scanner.
318    pub fn with_total_rows(mut self, total_rows: usize) -> Self {
319        self.total_rows = total_rows;
320        self
321    }
322
323    /// Creates a new [`ScannerProperties`] with the given partitioning.
324    pub fn new(partitions: Vec<Vec<PartitionRange>>, append_mode: bool, total_rows: usize) -> Self {
325        Self {
326            partitions,
327            append_mode,
328            total_rows,
329            distinguish_partition_range: false,
330            target_partitions: 0,
331            logical_region: false,
332        }
333    }
334
335    /// Updates the properties with the given [PrepareRequest].
336    pub fn prepare(&mut self, request: PrepareRequest) {
337        if let Some(ranges) = request.ranges {
338            self.partitions = ranges;
339        }
340        if let Some(distinguish_partition_range) = request.distinguish_partition_range {
341            self.distinguish_partition_range = distinguish_partition_range;
342        }
343        if let Some(target_partitions) = request.target_partitions {
344            self.target_partitions = target_partitions;
345        }
346    }
347
348    /// Returns the number of actual partitions.
349    pub fn num_partitions(&self) -> usize {
350        self.partitions.len()
351    }
352
353    pub fn append_mode(&self) -> bool {
354        self.append_mode
355    }
356
357    pub fn total_rows(&self) -> usize {
358        self.total_rows
359    }
360
361    /// Returns whether the scanner is scanning a logical region.
362    pub fn is_logical_region(&self) -> bool {
363        self.logical_region
364    }
365
366    /// Returns the target partitions of the scanner. If it is not set, returns the number of partitions.
367    pub fn target_partitions(&self) -> usize {
368        if self.target_partitions == 0 {
369            self.num_partitions()
370        } else {
371            self.target_partitions
372        }
373    }
374
375    /// Sets whether the scanner is reading a logical region.
376    pub fn set_logical_region(&mut self, logical_region: bool) {
377        self.logical_region = logical_region;
378    }
379}
380
381/// Request to override the scanner properties.
382#[derive(Default)]
383pub struct PrepareRequest {
384    /// Assigned partition ranges.
385    pub ranges: Option<Vec<Vec<PartitionRange>>>,
386    /// Distringuishes partition range by empty batches.
387    pub distinguish_partition_range: Option<bool>,
388    /// The expected number of target partitions.
389    pub target_partitions: Option<usize>,
390}
391
392impl PrepareRequest {
393    /// Sets the ranges.
394    pub fn with_ranges(mut self, ranges: Vec<Vec<PartitionRange>>) -> Self {
395        self.ranges = Some(ranges);
396        self
397    }
398
399    /// Sets the distinguish partition range flag.
400    pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self {
401        self.distinguish_partition_range = Some(distinguish_partition_range);
402        self
403    }
404
405    /// Sets the target partitions.
406    pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
407        self.target_partitions = Some(target_partitions);
408        self
409    }
410}
411
412/// Necessary context of the query for the scanner.
413#[derive(Clone, Default)]
414pub struct QueryScanContext {
415    /// Whether the query is EXPLAIN ANALYZE VERBOSE.
416    pub explain_verbose: bool,
417}
418
419/// A scanner that provides a way to scan the region concurrently.
420///
421/// The scanner splits the region into partitions so that each partition can be scanned concurrently.
422/// You can use this trait to implement an [`ExecutionPlan`](datafusion_physical_plan::ExecutionPlan).
423pub trait RegionScanner: Debug + DisplayAs + Send {
424    fn name(&self) -> &str;
425
426    /// Returns the properties of the scanner.
427    fn properties(&self) -> &ScannerProperties;
428
429    /// Returns the schema of the record batches.
430    fn schema(&self) -> SchemaRef;
431
432    /// Returns the metadata of the region.
433    fn metadata(&self) -> RegionMetadataRef;
434
435    /// Prepares the scanner with the given partition ranges.
436    ///
437    /// This method is for the planner to adjust the scanner's behavior based on the partition ranges.
438    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>;
439
440    /// Scans the partition and returns a stream of record batches.
441    ///
442    /// # Panics
443    /// Panics if the `partition` is out of bound.
444    fn scan_partition(
445        &self,
446        ctx: &QueryScanContext,
447        metrics_set: &ExecutionPlanMetricsSet,
448        partition: usize,
449    ) -> Result<SendableRecordBatchStream, BoxedError>;
450
451    /// Check if there is any predicate exclude region partition exprs that may be executed in this scanner.
452    fn has_predicate_without_region(&self) -> bool;
453
454    /// Add the given dynamic filter expressions to the predicate of the scanner.
455    /// Returns a vector of booleans indicating which filter expressions were applied.
456    /// true indicates the filter expression was applied(will be use by scanner to prune by stat for row group),
457    /// false otherwise.
458    fn add_dyn_filter_to_predicate(
459        &mut self,
460        filter_exprs: Vec<Arc<dyn PhysicalExpr>>,
461    ) -> Vec<bool>;
462
463    /// Sets whether the scanner is reading a logical region.
464    fn set_logical_region(&mut self, logical_region: bool);
465
466    fn snapshot_sequence(&self) -> Option<SequenceNumber> {
467        None
468    }
469}
470
471pub type RegionScannerRef = Box<dyn RegionScanner>;
472
473pub type BatchResponses = Vec<(RegionId, Result<RegionResponse, BoxedError>)>;
474
475/// Represents the statistics of a region.
476#[derive(Debug, Deserialize, Serialize, Default)]
477pub struct RegionStatistic {
478    /// The number of rows
479    #[serde(default)]
480    pub num_rows: u64,
481    /// The size of memtable in bytes.
482    pub memtable_size: u64,
483    /// The size of WAL in bytes.
484    pub wal_size: u64,
485    /// The size of manifest in bytes.
486    pub manifest_size: u64,
487    /// The size of SST data files in bytes.
488    pub sst_size: u64,
489    /// The num of SST files.
490    pub sst_num: u64,
491    /// The size of SST index files in bytes.
492    #[serde(default)]
493    pub index_size: u64,
494    /// The details of the region.
495    #[serde(default)]
496    pub manifest: RegionManifestInfo,
497    #[serde(default)]
498    /// The total bytes written of the region since region opened.
499    pub written_bytes: u64,
500    /// The latest entry id of the region's remote WAL since last flush.
501    /// For metric engine, there're two latest entry ids, one for data and one for metadata.
502    /// TODO(weny): remove this two fields and use single instead.
503    #[serde(default)]
504    pub data_topic_latest_entry_id: u64,
505    #[serde(default)]
506    pub metadata_topic_latest_entry_id: u64,
507}
508
509/// The manifest info of a region.
510#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
511pub enum RegionManifestInfo {
512    Mito {
513        manifest_version: u64,
514        flushed_entry_id: u64,
515        /// Number of files removed in the manifest's `removed_files` field.
516        file_removed_cnt: u64,
517    },
518    Metric {
519        data_manifest_version: u64,
520        data_flushed_entry_id: u64,
521        metadata_manifest_version: u64,
522        metadata_flushed_entry_id: u64,
523    },
524}
525
526impl RegionManifestInfo {
527    /// Creates a new [RegionManifestInfo] for mito2 engine.
528    pub fn mito(manifest_version: u64, flushed_entry_id: u64, file_removal_rate: u64) -> Self {
529        Self::Mito {
530            manifest_version,
531            flushed_entry_id,
532            file_removed_cnt: file_removal_rate,
533        }
534    }
535
536    /// Creates a new [RegionManifestInfo] for metric engine.
537    pub fn metric(
538        data_manifest_version: u64,
539        data_flushed_entry_id: u64,
540        metadata_manifest_version: u64,
541        metadata_flushed_entry_id: u64,
542    ) -> Self {
543        Self::Metric {
544            data_manifest_version,
545            data_flushed_entry_id,
546            metadata_manifest_version,
547            metadata_flushed_entry_id,
548        }
549    }
550
551    /// Returns true if the region is a mito2 region.
552    pub fn is_mito(&self) -> bool {
553        matches!(self, RegionManifestInfo::Mito { .. })
554    }
555
556    /// Returns true if the region is a metric region.
557    pub fn is_metric(&self) -> bool {
558        matches!(self, RegionManifestInfo::Metric { .. })
559    }
560
561    /// Returns the flushed entry id of the data region.
562    pub fn data_flushed_entry_id(&self) -> u64 {
563        match self {
564            RegionManifestInfo::Mito {
565                flushed_entry_id, ..
566            } => *flushed_entry_id,
567            RegionManifestInfo::Metric {
568                data_flushed_entry_id,
569                ..
570            } => *data_flushed_entry_id,
571        }
572    }
573
574    /// Returns the manifest version of the data region.
575    pub fn data_manifest_version(&self) -> u64 {
576        match self {
577            RegionManifestInfo::Mito {
578                manifest_version, ..
579            } => *manifest_version,
580            RegionManifestInfo::Metric {
581                data_manifest_version,
582                ..
583            } => *data_manifest_version,
584        }
585    }
586
587    /// Returns the manifest version of the metadata region.
588    pub fn metadata_manifest_version(&self) -> Option<u64> {
589        match self {
590            RegionManifestInfo::Mito { .. } => None,
591            RegionManifestInfo::Metric {
592                metadata_manifest_version,
593                ..
594            } => Some(*metadata_manifest_version),
595        }
596    }
597
598    /// Returns the flushed entry id of the metadata region.
599    pub fn metadata_flushed_entry_id(&self) -> Option<u64> {
600        match self {
601            RegionManifestInfo::Mito { .. } => None,
602            RegionManifestInfo::Metric {
603                metadata_flushed_entry_id,
604                ..
605            } => Some(*metadata_flushed_entry_id),
606        }
607    }
608
609    /// Encodes a list of ([RegionId], [RegionManifestInfo]) to a byte array.
610    pub fn encode_list(manifest_infos: &[(RegionId, Self)]) -> serde_json::Result<Vec<u8>> {
611        serde_json::to_vec(manifest_infos)
612    }
613
614    /// Decodes a list of ([RegionId], [RegionManifestInfo]) from a byte array.
615    pub fn decode_list(value: &[u8]) -> serde_json::Result<Vec<(RegionId, Self)>> {
616        serde_json::from_slice(value)
617    }
618}
619
620impl Default for RegionManifestInfo {
621    fn default() -> Self {
622        Self::Mito {
623            manifest_version: 0,
624            flushed_entry_id: 0,
625            file_removed_cnt: 0,
626        }
627    }
628}
629
630impl RegionStatistic {
631    /// Deserializes the region statistic to a byte array.
632    ///
633    /// Returns None if the deserialization fails.
634    pub fn deserialize_from_slice(value: &[u8]) -> Option<RegionStatistic> {
635        serde_json::from_slice(value).ok()
636    }
637
638    /// Serializes the region statistic to a byte array.
639    ///
640    /// Returns None if the serialization fails.
641    pub fn serialize_to_vec(&self) -> Option<Vec<u8>> {
642        serde_json::to_vec(self).ok()
643    }
644}
645
646impl RegionStatistic {
647    /// Returns the estimated disk size of the region.
648    pub fn estimated_disk_size(&self) -> u64 {
649        self.wal_size + self.sst_size + self.manifest_size + self.index_size
650    }
651}
652
653/// Request to sync the region from a manifest or a region.
654#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
655pub enum SyncRegionFromRequest {
656    /// Syncs the region using manifest information.
657    /// Used in leader-follower manifest sync scenarios.
658    FromManifest(RegionManifestInfo),
659    /// Syncs the region from another region.
660    ///
661    /// Used by the metric engine to sync logical regions from a source physical region
662    /// to a target physical region. This copies metadata region SST files and transforms
663    /// logical region entries to use the target's region number.
664    FromRegion {
665        /// The [`RegionId`] of the source region.
666        source_region_id: RegionId,
667        /// The parallelism of the sync operation.
668        parallelism: usize,
669    },
670}
671
672impl From<RegionManifestInfo> for SyncRegionFromRequest {
673    fn from(manifest_info: RegionManifestInfo) -> Self {
674        SyncRegionFromRequest::FromManifest(manifest_info)
675    }
676}
677
678impl SyncRegionFromRequest {
679    /// Creates a new request from a manifest info.
680    pub fn from_manifest(manifest_info: RegionManifestInfo) -> Self {
681        SyncRegionFromRequest::FromManifest(manifest_info)
682    }
683
684    /// Creates a new request from a region.
685    pub fn from_region(source_region_id: RegionId, parallelism: usize) -> Self {
686        SyncRegionFromRequest::FromRegion {
687            source_region_id,
688            parallelism,
689        }
690    }
691
692    /// Returns true if the request is from a manifest.
693    pub fn is_from_manifest(&self) -> bool {
694        matches!(self, SyncRegionFromRequest::FromManifest { .. })
695    }
696
697    /// Converts the request to a region manifest info.
698    ///
699    /// Returns None if the request is not from a manifest.
700    pub fn into_region_manifest_info(self) -> Option<RegionManifestInfo> {
701        match self {
702            SyncRegionFromRequest::FromManifest(manifest_info) => Some(manifest_info),
703            SyncRegionFromRequest::FromRegion { .. } => None,
704        }
705    }
706}
707
708/// The response of syncing the region.
709#[derive(Debug)]
710pub enum SyncRegionFromResponse {
711    NotSupported,
712    Mito {
713        /// Indicates if the data region was synced.
714        synced: bool,
715    },
716    Metric {
717        /// Indicates if the metadata region was synced.
718        metadata_synced: bool,
719        /// Indicates if the data region was synced.
720        data_synced: bool,
721        /// The logical regions that were newly opened during the sync operation.
722        /// This only occurs after the metadata region has been successfully synced.
723        new_opened_logical_region_ids: Vec<RegionId>,
724    },
725}
726
727impl SyncRegionFromResponse {
728    /// Returns true if data region is synced.
729    pub fn is_data_synced(&self) -> bool {
730        match self {
731            SyncRegionFromResponse::NotSupported => false,
732            SyncRegionFromResponse::Mito { synced } => *synced,
733            SyncRegionFromResponse::Metric { data_synced, .. } => *data_synced,
734        }
735    }
736
737    /// Returns true if the engine is a mito2 engine.
738    pub fn is_mito(&self) -> bool {
739        matches!(self, SyncRegionFromResponse::Mito { .. })
740    }
741
742    /// Returns true if the engine is a metric engine.
743    pub fn is_metric(&self) -> bool {
744        matches!(self, SyncRegionFromResponse::Metric { .. })
745    }
746
747    /// Returns the new opened logical region ids.
748    pub fn new_opened_logical_region_ids(self) -> Option<Vec<RegionId>> {
749        match self {
750            SyncRegionFromResponse::Metric {
751                new_opened_logical_region_ids,
752                ..
753            } => Some(new_opened_logical_region_ids),
754            _ => None,
755        }
756    }
757}
758
759/// Request to remap manifests from old regions to new regions.
760#[derive(Debug, Clone)]
761pub struct RemapManifestsRequest {
762    /// The [`RegionId`] of a staging region used to obtain table directory and storage configuration for the remap operation.
763    pub region_id: RegionId,
764    /// Regions to remap manifests from.
765    pub input_regions: Vec<RegionId>,
766    /// For each old region, which new regions should receive its files
767    pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
768    /// New partition expressions for the new regions.
769    pub new_partition_exprs: HashMap<RegionId, String>,
770}
771
772/// Response to remap manifests from old regions to new regions.
773#[derive(Debug, Clone)]
774pub struct RemapManifestsResponse {
775    /// Maps region id to its staging manifest path.
776    ///
777    /// These paths are relative paths within the central region's staging blob storage,
778    /// and should be passed to [`ApplyStagingManifestRequest`](RegionRequest::ApplyStagingManifest) to finalize the repartition.
779    pub manifest_paths: HashMap<RegionId, String>,
780}
781
782/// Request to copy files from a source region to a target region.
783#[derive(Debug, Clone)]
784pub struct MitoCopyRegionFromRequest {
785    /// The [`RegionId`] of the source region.
786    pub source_region_id: RegionId,
787    /// The parallelism of the copy operation.
788    pub parallelism: usize,
789}
790
791#[derive(Debug, Clone)]
792pub struct MitoCopyRegionFromResponse {
793    /// The file ids that were copied from the source region to the target region.
794    pub copied_file_ids: Vec<FileId>,
795}
796
797#[async_trait]
798pub trait RegionEngine: Send + Sync {
799    /// Name of this engine
800    fn name(&self) -> &str;
801
802    /// Handles batch open region requests.
803    async fn handle_batch_open_requests(
804        &self,
805        parallelism: usize,
806        requests: Vec<(RegionId, RegionOpenRequest)>,
807    ) -> Result<BatchResponses, BoxedError> {
808        let semaphore = Arc::new(Semaphore::new(parallelism));
809        let mut tasks = Vec::with_capacity(requests.len());
810
811        for (region_id, request) in requests {
812            let semaphore_moved = semaphore.clone();
813
814            tasks.push(async move {
815                // Safety: semaphore must exist
816                let _permit = semaphore_moved.acquire().await.unwrap();
817                let result = self
818                    .handle_request(region_id, RegionRequest::Open(request))
819                    .await;
820                (region_id, result)
821            });
822        }
823
824        Ok(join_all(tasks).await)
825    }
826
827    async fn handle_batch_catchup_requests(
828        &self,
829        parallelism: usize,
830        requests: Vec<(RegionId, RegionCatchupRequest)>,
831    ) -> Result<BatchResponses, BoxedError> {
832        let semaphore = Arc::new(Semaphore::new(parallelism));
833        let mut tasks = Vec::with_capacity(requests.len());
834
835        for (region_id, request) in requests {
836            let semaphore_moved = semaphore.clone();
837
838            tasks.push(async move {
839                // Safety: semaphore must exist
840                let _permit = semaphore_moved.acquire().await.unwrap();
841                let result = self
842                    .handle_request(region_id, RegionRequest::Catchup(request))
843                    .await;
844                (region_id, result)
845            });
846        }
847
848        Ok(join_all(tasks).await)
849    }
850
851    async fn handle_batch_ddl_requests(
852        &self,
853        request: BatchRegionDdlRequest,
854    ) -> Result<RegionResponse, BoxedError> {
855        let requests = request.into_region_requests();
856
857        let mut affected_rows = 0;
858        let mut extensions = HashMap::new();
859
860        for (region_id, request) in requests {
861            let result = self.handle_request(region_id, request).await?;
862            affected_rows += result.affected_rows;
863            extensions.extend(result.extensions);
864        }
865
866        Ok(RegionResponse {
867            affected_rows,
868            extensions,
869            metadata: Vec::new(),
870        })
871    }
872
873    /// Handles non-query request to the region. Returns the count of affected rows.
874    async fn handle_request(
875        &self,
876        region_id: RegionId,
877        request: RegionRequest,
878    ) -> Result<RegionResponse, BoxedError>;
879
880    /// Returns the committed sequence (sequence of latest written data).
881    async fn get_committed_sequence(
882        &self,
883        region_id: RegionId,
884    ) -> Result<SequenceNumber, BoxedError>;
885
886    /// Handles query and return a scanner that can be used to scan the region concurrently.
887    async fn handle_query(
888        &self,
889        region_id: RegionId,
890        request: ScanRequest,
891    ) -> Result<RegionScannerRef, BoxedError>;
892
893    /// Returns the query memory tracker for scan execution.
894    fn query_memory_tracker(&self) -> Option<QueryMemoryTracker> {
895        None
896    }
897
898    /// Retrieves region's metadata.
899    async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;
900
901    /// Retrieves region's statistic.
902    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic>;
903
904    /// Stops the engine
905    async fn stop(&self) -> Result<(), BoxedError>;
906
907    /// Sets [RegionRole] for a region.
908    ///
909    /// The engine checks whether the region is writable before writing to the region. Setting
910    /// the region as readonly doesn't guarantee that write operations in progress will not
911    /// take effect.
912    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError>;
913
914    /// Syncs the region manifest to the given manifest version.
915    async fn sync_region(
916        &self,
917        region_id: RegionId,
918        request: SyncRegionFromRequest,
919    ) -> Result<SyncRegionFromResponse, BoxedError>;
920
921    /// Remaps manifests from old regions to new regions.
922    async fn remap_manifests(
923        &self,
924        request: RemapManifestsRequest,
925    ) -> Result<RemapManifestsResponse, BoxedError>;
926
927    /// Sets region role state gracefully.
928    ///
929    /// After the call returns, the engine ensures no more write operations will succeed in the region.
930    async fn set_region_role_state_gracefully(
931        &self,
932        region_id: RegionId,
933        region_role_state: SettableRegionRoleState,
934    ) -> Result<SetRegionRoleStateResponse, BoxedError>;
935
936    /// Indicates region role.
937    ///
938    /// Returns the `None` if the region is not found.
939    fn role(&self, region_id: RegionId) -> Option<RegionRole>;
940
941    fn as_any(&self) -> &dyn Any;
942}
943
944pub type RegionEngineRef = Arc<dyn RegionEngine>;
945
946/// A [RegionScanner] that only scans a single partition.
947pub struct SinglePartitionScanner {
948    stream: Mutex<Option<SendableRecordBatchStream>>,
949    schema: SchemaRef,
950    properties: ScannerProperties,
951    metadata: RegionMetadataRef,
952    snapshot_sequence: Option<SequenceNumber>,
953}
954
955impl SinglePartitionScanner {
956    /// Creates a new [SinglePartitionScanner] with the given stream and metadata.
957    pub fn new(
958        stream: SendableRecordBatchStream,
959        append_mode: bool,
960        metadata: RegionMetadataRef,
961        snapshot_sequence: Option<SequenceNumber>,
962    ) -> Self {
963        let schema = stream.schema();
964        Self {
965            stream: Mutex::new(Some(stream)),
966            schema,
967            properties: ScannerProperties::default().with_append_mode(append_mode),
968            metadata,
969            snapshot_sequence,
970        }
971    }
972}
973
974impl Debug for SinglePartitionScanner {
975    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
976        write!(f, "SinglePartitionScanner: <SendableRecordBatchStream>")
977    }
978}
979
980impl RegionScanner for SinglePartitionScanner {
981    fn name(&self) -> &str {
982        "SinglePartition"
983    }
984
985    fn properties(&self) -> &ScannerProperties {
986        &self.properties
987    }
988
989    fn schema(&self) -> SchemaRef {
990        self.schema.clone()
991    }
992
993    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
994        self.properties.prepare(request);
995        Ok(())
996    }
997
998    fn scan_partition(
999        &self,
1000        _ctx: &QueryScanContext,
1001        _metrics_set: &ExecutionPlanMetricsSet,
1002        _partition: usize,
1003    ) -> Result<SendableRecordBatchStream, BoxedError> {
1004        let mut stream = self.stream.lock().unwrap();
1005        let result = stream
1006            .take()
1007            .or_else(|| Some(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))));
1008        Ok(result.unwrap())
1009    }
1010
1011    fn has_predicate_without_region(&self) -> bool {
1012        false
1013    }
1014
1015    fn add_dyn_filter_to_predicate(
1016        &mut self,
1017        filter_exprs: Vec<Arc<dyn datafusion_physical_plan::PhysicalExpr>>,
1018    ) -> Vec<bool> {
1019        vec![false; filter_exprs.len()]
1020    }
1021
1022    fn metadata(&self) -> RegionMetadataRef {
1023        self.metadata.clone()
1024    }
1025
1026    fn set_logical_region(&mut self, logical_region: bool) {
1027        self.properties.set_logical_region(logical_region);
1028    }
1029
1030    fn snapshot_sequence(&self) -> Option<SequenceNumber> {
1031        self.snapshot_sequence
1032    }
1033}
1034
1035impl DisplayAs for SinglePartitionScanner {
1036    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1037        write!(f, "{:?}", self)
1038    }
1039}