1use std::any::Any;
18use std::collections::HashMap;
19use std::fmt::{Debug, Display};
20use std::sync::{Arc, Mutex};
21
22use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
23use api::region::RegionResponse;
24use async_trait::async_trait;
25use common_error::ext::BoxedError;
26use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
27use common_time::Timestamp;
28use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
29use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
30use datatypes::schema::SchemaRef;
31use futures::future::join_all;
32use serde::{Deserialize, Serialize};
33use tokio::sync::Semaphore;
34
35use crate::logstore::entry;
36use crate::metadata::RegionMetadataRef;
37use crate::region_request::{
38    BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
39};
40use crate::storage::{RegionId, ScanRequest, SequenceNumber};
41
42#[derive(Debug, PartialEq, Eq, Clone, Copy)]
44pub enum SettableRegionRoleState {
45    Follower,
46    DowngradingLeader,
47    Leader,
49    StagingLeader,
51}
52
53impl Display for SettableRegionRoleState {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        match self {
56            SettableRegionRoleState::Follower => write!(f, "Follower"),
57            SettableRegionRoleState::DowngradingLeader => write!(f, "Leader(Downgrading)"),
58            SettableRegionRoleState::Leader => write!(f, "Leader"),
59            SettableRegionRoleState::StagingLeader => write!(f, "Leader(Staging)"),
60        }
61    }
62}
63
64impl From<SettableRegionRoleState> for RegionRole {
65    fn from(value: SettableRegionRoleState) -> Self {
66        match value {
67            SettableRegionRoleState::Follower => RegionRole::Follower,
68            SettableRegionRoleState::DowngradingLeader => RegionRole::DowngradingLeader,
69            SettableRegionRoleState::Leader => RegionRole::Leader,
70            SettableRegionRoleState::StagingLeader => RegionRole::Leader, }
72    }
73}
74
75#[derive(Debug, PartialEq, Eq)]
77pub struct SetRegionRoleStateRequest {
78    region_id: RegionId,
79    region_role_state: SettableRegionRoleState,
80}
81
82#[derive(Debug, PartialEq, Eq)]
84pub enum SetRegionRoleStateSuccess {
85    File,
86    Mito {
87        last_entry_id: entry::Id,
88    },
89    Metric {
90        last_entry_id: entry::Id,
91        metadata_last_entry_id: entry::Id,
92    },
93}
94
95impl SetRegionRoleStateSuccess {
96    pub fn file() -> Self {
98        Self::File
99    }
100
101    pub fn mito(last_entry_id: entry::Id) -> Self {
103        SetRegionRoleStateSuccess::Mito { last_entry_id }
104    }
105
106    pub fn metric(last_entry_id: entry::Id, metadata_last_entry_id: entry::Id) -> Self {
108        SetRegionRoleStateSuccess::Metric {
109            last_entry_id,
110            metadata_last_entry_id,
111        }
112    }
113}
114
115impl SetRegionRoleStateSuccess {
116    pub fn last_entry_id(&self) -> Option<entry::Id> {
118        match self {
119            SetRegionRoleStateSuccess::File => None,
120            SetRegionRoleStateSuccess::Mito { last_entry_id } => Some(*last_entry_id),
121            SetRegionRoleStateSuccess::Metric { last_entry_id, .. } => Some(*last_entry_id),
122        }
123    }
124
125    pub fn metadata_last_entry_id(&self) -> Option<entry::Id> {
127        match self {
128            SetRegionRoleStateSuccess::File => None,
129            SetRegionRoleStateSuccess::Mito { .. } => None,
130            SetRegionRoleStateSuccess::Metric {
131                metadata_last_entry_id,
132                ..
133            } => Some(*metadata_last_entry_id),
134        }
135    }
136}
137
138#[derive(Debug)]
140pub enum SetRegionRoleStateResponse {
141    Success(SetRegionRoleStateSuccess),
142    NotFound,
143    InvalidTransition(BoxedError),
144}
145
146impl SetRegionRoleStateResponse {
147    pub fn success(success: SetRegionRoleStateSuccess) -> Self {
149        Self::Success(success)
150    }
151
152    pub fn invalid_transition(error: BoxedError) -> Self {
154        Self::InvalidTransition(error)
155    }
156
157    pub fn is_not_found(&self) -> bool {
159        matches!(self, SetRegionRoleStateResponse::NotFound)
160    }
161
162    pub fn is_invalid_transition(&self) -> bool {
164        matches!(self, SetRegionRoleStateResponse::InvalidTransition(_))
165    }
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
169pub struct GrantedRegion {
170    pub region_id: RegionId,
171    pub region_role: RegionRole,
172    pub extensions: HashMap<String, Vec<u8>>,
173}
174
175impl GrantedRegion {
176    pub fn new(region_id: RegionId, region_role: RegionRole) -> Self {
177        Self {
178            region_id,
179            region_role,
180            extensions: HashMap::new(),
181        }
182    }
183}
184
185impl From<GrantedRegion> for PbGrantedRegion {
186    fn from(value: GrantedRegion) -> Self {
187        PbGrantedRegion {
188            region_id: value.region_id.as_u64(),
189            role: PbRegionRole::from(value.region_role).into(),
190            extensions: value.extensions,
191        }
192    }
193}
194
195impl From<PbGrantedRegion> for GrantedRegion {
196    fn from(value: PbGrantedRegion) -> Self {
197        GrantedRegion {
198            region_id: RegionId::from_u64(value.region_id),
199            region_role: value.role().into(),
200            extensions: value.extensions,
201        }
202    }
203}
204
205#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
208pub enum RegionRole {
209    Follower,
211    Leader,
213    DowngradingLeader,
217}
218
219impl Display for RegionRole {
220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221        match self {
222            RegionRole::Follower => write!(f, "Follower"),
223            RegionRole::Leader => write!(f, "Leader"),
224            RegionRole::DowngradingLeader => write!(f, "Leader(Downgrading)"),
225        }
226    }
227}
228
229impl RegionRole {
230    pub fn writable(&self) -> bool {
231        matches!(self, RegionRole::Leader)
232    }
233}
234
235impl From<RegionRole> for PbRegionRole {
236    fn from(value: RegionRole) -> Self {
237        match value {
238            RegionRole::Follower => PbRegionRole::Follower,
239            RegionRole::Leader => PbRegionRole::Leader,
240            RegionRole::DowngradingLeader => PbRegionRole::DowngradingLeader,
241        }
242    }
243}
244
245impl From<PbRegionRole> for RegionRole {
246    fn from(value: PbRegionRole) -> Self {
247        match value {
248            PbRegionRole::Leader => RegionRole::Leader,
249            PbRegionRole::Follower => RegionRole::Follower,
250            PbRegionRole::DowngradingLeader => RegionRole::DowngradingLeader,
251        }
252    }
253}
254
255#[derive(Debug)]
257pub enum ScannerPartitioning {
258    Unknown(usize),
260}
261
262impl ScannerPartitioning {
263    pub fn num_partitions(&self) -> usize {
265        match self {
266            ScannerPartitioning::Unknown(num_partitions) => *num_partitions,
267        }
268    }
269}
270
271#[derive(Debug, Clone, Copy, PartialEq, Eq)]
273pub struct PartitionRange {
274    pub start: Timestamp,
276    pub end: Timestamp,
278    pub num_rows: usize,
280    pub identifier: usize,
282}
283
284#[derive(Debug, Default)]
286pub struct ScannerProperties {
287    pub partitions: Vec<Vec<PartitionRange>>,
292
293    append_mode: bool,
295
296    total_rows: usize,
299
300    pub distinguish_partition_range: bool,
302
303    target_partitions: usize,
305
306    logical_region: bool,
308}
309
310impl ScannerProperties {
311    pub fn with_append_mode(mut self, append_mode: bool) -> Self {
313        self.append_mode = append_mode;
314        self
315    }
316
317    pub fn with_total_rows(mut self, total_rows: usize) -> Self {
319        self.total_rows = total_rows;
320        self
321    }
322
323    pub fn new(partitions: Vec<Vec<PartitionRange>>, append_mode: bool, total_rows: usize) -> Self {
325        Self {
326            partitions,
327            append_mode,
328            total_rows,
329            distinguish_partition_range: false,
330            target_partitions: 0,
331            logical_region: false,
332        }
333    }
334
335    pub fn prepare(&mut self, request: PrepareRequest) {
337        if let Some(ranges) = request.ranges {
338            self.partitions = ranges;
339        }
340        if let Some(distinguish_partition_range) = request.distinguish_partition_range {
341            self.distinguish_partition_range = distinguish_partition_range;
342        }
343        if let Some(target_partitions) = request.target_partitions {
344            self.target_partitions = target_partitions;
345        }
346    }
347
348    pub fn num_partitions(&self) -> usize {
350        self.partitions.len()
351    }
352
353    pub fn append_mode(&self) -> bool {
354        self.append_mode
355    }
356
357    pub fn total_rows(&self) -> usize {
358        self.total_rows
359    }
360
361    pub fn is_logical_region(&self) -> bool {
363        self.logical_region
364    }
365
366    pub fn target_partitions(&self) -> usize {
368        if self.target_partitions == 0 {
369            self.num_partitions()
370        } else {
371            self.target_partitions
372        }
373    }
374
375    pub fn set_logical_region(&mut self, logical_region: bool) {
377        self.logical_region = logical_region;
378    }
379}
380
381#[derive(Default)]
383pub struct PrepareRequest {
384    pub ranges: Option<Vec<Vec<PartitionRange>>>,
386    pub distinguish_partition_range: Option<bool>,
388    pub target_partitions: Option<usize>,
390}
391
392impl PrepareRequest {
393    pub fn with_ranges(mut self, ranges: Vec<Vec<PartitionRange>>) -> Self {
395        self.ranges = Some(ranges);
396        self
397    }
398
399    pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self {
401        self.distinguish_partition_range = Some(distinguish_partition_range);
402        self
403    }
404
405    pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
407        self.target_partitions = Some(target_partitions);
408        self
409    }
410}
411
412#[derive(Clone, Default)]
414pub struct QueryScanContext {
415    pub explain_verbose: bool,
417}
418
419pub trait RegionScanner: Debug + DisplayAs + Send {
424    fn properties(&self) -> &ScannerProperties;
426
427    fn schema(&self) -> SchemaRef;
429
430    fn metadata(&self) -> RegionMetadataRef;
432
433    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>;
437
438    fn scan_partition(
443        &self,
444        ctx: &QueryScanContext,
445        metrics_set: &ExecutionPlanMetricsSet,
446        partition: usize,
447    ) -> Result<SendableRecordBatchStream, BoxedError>;
448
449    fn has_predicate_without_region(&self) -> bool;
451
452    fn set_logical_region(&mut self, logical_region: bool);
454}
455
456pub type RegionScannerRef = Box<dyn RegionScanner>;
457
458pub type BatchResponses = Vec<(RegionId, Result<RegionResponse, BoxedError>)>;
459
460#[derive(Debug, Deserialize, Serialize, Default)]
462pub struct RegionStatistic {
463    #[serde(default)]
465    pub num_rows: u64,
466    pub memtable_size: u64,
468    pub wal_size: u64,
470    pub manifest_size: u64,
472    pub sst_size: u64,
474    pub sst_num: u64,
476    #[serde(default)]
478    pub index_size: u64,
479    #[serde(default)]
481    pub manifest: RegionManifestInfo,
482    #[serde(default)]
483    pub written_bytes: u64,
485    #[serde(default)]
489    pub data_topic_latest_entry_id: u64,
490    #[serde(default)]
491    pub metadata_topic_latest_entry_id: u64,
492}
493
494#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
496pub enum RegionManifestInfo {
497    Mito {
498        manifest_version: u64,
499        flushed_entry_id: u64,
500    },
501    Metric {
502        data_manifest_version: u64,
503        data_flushed_entry_id: u64,
504        metadata_manifest_version: u64,
505        metadata_flushed_entry_id: u64,
506    },
507}
508
509impl RegionManifestInfo {
510    pub fn mito(manifest_version: u64, flushed_entry_id: u64) -> Self {
512        Self::Mito {
513            manifest_version,
514            flushed_entry_id,
515        }
516    }
517
518    pub fn metric(
520        data_manifest_version: u64,
521        data_flushed_entry_id: u64,
522        metadata_manifest_version: u64,
523        metadata_flushed_entry_id: u64,
524    ) -> Self {
525        Self::Metric {
526            data_manifest_version,
527            data_flushed_entry_id,
528            metadata_manifest_version,
529            metadata_flushed_entry_id,
530        }
531    }
532
533    pub fn is_mito(&self) -> bool {
535        matches!(self, RegionManifestInfo::Mito { .. })
536    }
537
538    pub fn is_metric(&self) -> bool {
540        matches!(self, RegionManifestInfo::Metric { .. })
541    }
542
543    pub fn data_flushed_entry_id(&self) -> u64 {
545        match self {
546            RegionManifestInfo::Mito {
547                flushed_entry_id, ..
548            } => *flushed_entry_id,
549            RegionManifestInfo::Metric {
550                data_flushed_entry_id,
551                ..
552            } => *data_flushed_entry_id,
553        }
554    }
555
556    pub fn data_manifest_version(&self) -> u64 {
558        match self {
559            RegionManifestInfo::Mito {
560                manifest_version, ..
561            } => *manifest_version,
562            RegionManifestInfo::Metric {
563                data_manifest_version,
564                ..
565            } => *data_manifest_version,
566        }
567    }
568
569    pub fn metadata_manifest_version(&self) -> Option<u64> {
571        match self {
572            RegionManifestInfo::Mito { .. } => None,
573            RegionManifestInfo::Metric {
574                metadata_manifest_version,
575                ..
576            } => Some(*metadata_manifest_version),
577        }
578    }
579
580    pub fn metadata_flushed_entry_id(&self) -> Option<u64> {
582        match self {
583            RegionManifestInfo::Mito { .. } => None,
584            RegionManifestInfo::Metric {
585                metadata_flushed_entry_id,
586                ..
587            } => Some(*metadata_flushed_entry_id),
588        }
589    }
590
591    pub fn encode_list(manifest_infos: &[(RegionId, Self)]) -> serde_json::Result<Vec<u8>> {
593        serde_json::to_vec(manifest_infos)
594    }
595
596    pub fn decode_list(value: &[u8]) -> serde_json::Result<Vec<(RegionId, Self)>> {
598        serde_json::from_slice(value)
599    }
600}
601
602impl Default for RegionManifestInfo {
603    fn default() -> Self {
604        Self::Mito {
605            manifest_version: 0,
606            flushed_entry_id: 0,
607        }
608    }
609}
610
611impl RegionStatistic {
612    pub fn deserialize_from_slice(value: &[u8]) -> Option<RegionStatistic> {
616        serde_json::from_slice(value).ok()
617    }
618
619    pub fn serialize_to_vec(&self) -> Option<Vec<u8>> {
623        serde_json::to_vec(self).ok()
624    }
625}
626
627impl RegionStatistic {
628    pub fn estimated_disk_size(&self) -> u64 {
630        self.wal_size + self.sst_size + self.manifest_size + self.index_size
631    }
632}
633
634#[derive(Debug)]
636pub enum SyncManifestResponse {
637    NotSupported,
638    Mito {
639        synced: bool,
641    },
642    Metric {
643        metadata_synced: bool,
645        data_synced: bool,
647        new_opened_logical_region_ids: Vec<RegionId>,
650    },
651}
652
653impl SyncManifestResponse {
654    pub fn is_data_synced(&self) -> bool {
656        match self {
657            SyncManifestResponse::NotSupported => false,
658            SyncManifestResponse::Mito { synced } => *synced,
659            SyncManifestResponse::Metric { data_synced, .. } => *data_synced,
660        }
661    }
662
663    pub fn is_supported(&self) -> bool {
665        matches!(self, SyncManifestResponse::NotSupported)
666    }
667
668    pub fn is_mito(&self) -> bool {
670        matches!(self, SyncManifestResponse::Mito { .. })
671    }
672
673    pub fn is_metric(&self) -> bool {
675        matches!(self, SyncManifestResponse::Metric { .. })
676    }
677
678    pub fn new_opened_logical_region_ids(self) -> Option<Vec<RegionId>> {
680        match self {
681            SyncManifestResponse::Metric {
682                new_opened_logical_region_ids,
683                ..
684            } => Some(new_opened_logical_region_ids),
685            _ => None,
686        }
687    }
688}
689
690#[async_trait]
691pub trait RegionEngine: Send + Sync {
692    fn name(&self) -> &str;
694
695    async fn handle_batch_open_requests(
697        &self,
698        parallelism: usize,
699        requests: Vec<(RegionId, RegionOpenRequest)>,
700    ) -> Result<BatchResponses, BoxedError> {
701        let semaphore = Arc::new(Semaphore::new(parallelism));
702        let mut tasks = Vec::with_capacity(requests.len());
703
704        for (region_id, request) in requests {
705            let semaphore_moved = semaphore.clone();
706
707            tasks.push(async move {
708                let _permit = semaphore_moved.acquire().await.unwrap();
710                let result = self
711                    .handle_request(region_id, RegionRequest::Open(request))
712                    .await;
713                (region_id, result)
714            });
715        }
716
717        Ok(join_all(tasks).await)
718    }
719
720    async fn handle_batch_catchup_requests(
721        &self,
722        parallelism: usize,
723        requests: Vec<(RegionId, RegionCatchupRequest)>,
724    ) -> Result<BatchResponses, BoxedError> {
725        let semaphore = Arc::new(Semaphore::new(parallelism));
726        let mut tasks = Vec::with_capacity(requests.len());
727
728        for (region_id, request) in requests {
729            let semaphore_moved = semaphore.clone();
730
731            tasks.push(async move {
732                let _permit = semaphore_moved.acquire().await.unwrap();
734                let result = self
735                    .handle_request(region_id, RegionRequest::Catchup(request))
736                    .await;
737                (region_id, result)
738            });
739        }
740
741        Ok(join_all(tasks).await)
742    }
743
744    async fn handle_batch_ddl_requests(
745        &self,
746        request: BatchRegionDdlRequest,
747    ) -> Result<RegionResponse, BoxedError> {
748        let requests = request.into_region_requests();
749
750        let mut affected_rows = 0;
751        let mut extensions = HashMap::new();
752
753        for (region_id, request) in requests {
754            let result = self.handle_request(region_id, request).await?;
755            affected_rows += result.affected_rows;
756            extensions.extend(result.extensions);
757        }
758
759        Ok(RegionResponse {
760            affected_rows,
761            extensions,
762            metadata: Vec::new(),
763        })
764    }
765
766    async fn handle_request(
768        &self,
769        region_id: RegionId,
770        request: RegionRequest,
771    ) -> Result<RegionResponse, BoxedError>;
772
773    async fn get_committed_sequence(
775        &self,
776        region_id: RegionId,
777    ) -> Result<SequenceNumber, BoxedError>;
778
779    async fn handle_query(
781        &self,
782        region_id: RegionId,
783        request: ScanRequest,
784    ) -> Result<RegionScannerRef, BoxedError>;
785
786    async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;
788
789    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic>;
791
792    async fn stop(&self) -> Result<(), BoxedError>;
794
795    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError>;
801
802    async fn sync_region(
804        &self,
805        region_id: RegionId,
806        manifest_info: RegionManifestInfo,
807    ) -> Result<SyncManifestResponse, BoxedError>;
808
809    async fn set_region_role_state_gracefully(
813        &self,
814        region_id: RegionId,
815        region_role_state: SettableRegionRoleState,
816    ) -> Result<SetRegionRoleStateResponse, BoxedError>;
817
818    fn role(&self, region_id: RegionId) -> Option<RegionRole>;
822
823    fn as_any(&self) -> &dyn Any;
824}
825
826pub type RegionEngineRef = Arc<dyn RegionEngine>;
827
828pub struct SinglePartitionScanner {
830    stream: Mutex<Option<SendableRecordBatchStream>>,
831    schema: SchemaRef,
832    properties: ScannerProperties,
833    metadata: RegionMetadataRef,
834}
835
836impl SinglePartitionScanner {
837    pub fn new(
839        stream: SendableRecordBatchStream,
840        append_mode: bool,
841        metadata: RegionMetadataRef,
842    ) -> Self {
843        let schema = stream.schema();
844        Self {
845            stream: Mutex::new(Some(stream)),
846            schema,
847            properties: ScannerProperties::default().with_append_mode(append_mode),
848            metadata,
849        }
850    }
851}
852
853impl Debug for SinglePartitionScanner {
854    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
855        write!(f, "SinglePartitionScanner: <SendableRecordBatchStream>")
856    }
857}
858
859impl RegionScanner for SinglePartitionScanner {
860    fn properties(&self) -> &ScannerProperties {
861        &self.properties
862    }
863
864    fn schema(&self) -> SchemaRef {
865        self.schema.clone()
866    }
867
868    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
869        self.properties.prepare(request);
870        Ok(())
871    }
872
873    fn scan_partition(
874        &self,
875        _ctx: &QueryScanContext,
876        _metrics_set: &ExecutionPlanMetricsSet,
877        _partition: usize,
878    ) -> Result<SendableRecordBatchStream, BoxedError> {
879        let mut stream = self.stream.lock().unwrap();
880        let result = stream
881            .take()
882            .or_else(|| Some(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))));
883        Ok(result.unwrap())
884    }
885
886    fn has_predicate_without_region(&self) -> bool {
887        false
888    }
889
890    fn metadata(&self) -> RegionMetadataRef {
891        self.metadata.clone()
892    }
893
894    fn set_logical_region(&mut self, logical_region: bool) {
895        self.properties.set_logical_region(logical_region);
896    }
897}
898
899impl DisplayAs for SinglePartitionScanner {
900    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
901        write!(f, "{:?}", self)
902    }
903}