1use std::any::Any;
18use std::collections::HashMap;
19use std::fmt::{Debug, Display};
20use std::sync::{Arc, Mutex};
21
22use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
23use api::region::RegionResponse;
24use async_trait::async_trait;
25use common_error::ext::BoxedError;
26use common_recordbatch::{EmptyRecordBatchStream, MemoryPermit, SendableRecordBatchStream};
27use common_time::Timestamp;
28use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
29use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
30use datatypes::schema::SchemaRef;
31use futures::future::join_all;
32use serde::{Deserialize, Serialize};
33use tokio::sync::Semaphore;
34
35use crate::logstore::entry;
36use crate::metadata::RegionMetadataRef;
37use crate::region_request::{
38 BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
39};
40use crate::storage::{RegionId, ScanRequest, SequenceNumber};
41
42#[derive(Debug, PartialEq, Eq, Clone, Copy)]
44pub enum SettableRegionRoleState {
45 Follower,
46 DowngradingLeader,
47 Leader,
49 StagingLeader,
51}
52
53impl Display for SettableRegionRoleState {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 match self {
56 SettableRegionRoleState::Follower => write!(f, "Follower"),
57 SettableRegionRoleState::DowngradingLeader => write!(f, "Leader(Downgrading)"),
58 SettableRegionRoleState::Leader => write!(f, "Leader"),
59 SettableRegionRoleState::StagingLeader => write!(f, "Leader(Staging)"),
60 }
61 }
62}
63
64impl From<SettableRegionRoleState> for RegionRole {
65 fn from(value: SettableRegionRoleState) -> Self {
66 match value {
67 SettableRegionRoleState::Follower => RegionRole::Follower,
68 SettableRegionRoleState::DowngradingLeader => RegionRole::DowngradingLeader,
69 SettableRegionRoleState::Leader => RegionRole::Leader,
70 SettableRegionRoleState::StagingLeader => RegionRole::Leader, }
72 }
73}
74
75#[derive(Debug, PartialEq, Eq)]
77pub struct SetRegionRoleStateRequest {
78 region_id: RegionId,
79 region_role_state: SettableRegionRoleState,
80}
81
82#[derive(Debug, PartialEq, Eq)]
84pub enum SetRegionRoleStateSuccess {
85 File,
86 Mito {
87 last_entry_id: entry::Id,
88 },
89 Metric {
90 last_entry_id: entry::Id,
91 metadata_last_entry_id: entry::Id,
92 },
93}
94
95impl SetRegionRoleStateSuccess {
96 pub fn file() -> Self {
98 Self::File
99 }
100
101 pub fn mito(last_entry_id: entry::Id) -> Self {
103 SetRegionRoleStateSuccess::Mito { last_entry_id }
104 }
105
106 pub fn metric(last_entry_id: entry::Id, metadata_last_entry_id: entry::Id) -> Self {
108 SetRegionRoleStateSuccess::Metric {
109 last_entry_id,
110 metadata_last_entry_id,
111 }
112 }
113}
114
115impl SetRegionRoleStateSuccess {
116 pub fn last_entry_id(&self) -> Option<entry::Id> {
118 match self {
119 SetRegionRoleStateSuccess::File => None,
120 SetRegionRoleStateSuccess::Mito { last_entry_id } => Some(*last_entry_id),
121 SetRegionRoleStateSuccess::Metric { last_entry_id, .. } => Some(*last_entry_id),
122 }
123 }
124
125 pub fn metadata_last_entry_id(&self) -> Option<entry::Id> {
127 match self {
128 SetRegionRoleStateSuccess::File => None,
129 SetRegionRoleStateSuccess::Mito { .. } => None,
130 SetRegionRoleStateSuccess::Metric {
131 metadata_last_entry_id,
132 ..
133 } => Some(*metadata_last_entry_id),
134 }
135 }
136}
137
138#[derive(Debug)]
140pub enum SetRegionRoleStateResponse {
141 Success(SetRegionRoleStateSuccess),
142 NotFound,
143 InvalidTransition(BoxedError),
144}
145
146impl SetRegionRoleStateResponse {
147 pub fn success(success: SetRegionRoleStateSuccess) -> Self {
149 Self::Success(success)
150 }
151
152 pub fn invalid_transition(error: BoxedError) -> Self {
154 Self::InvalidTransition(error)
155 }
156
157 pub fn is_not_found(&self) -> bool {
159 matches!(self, SetRegionRoleStateResponse::NotFound)
160 }
161
162 pub fn is_invalid_transition(&self) -> bool {
164 matches!(self, SetRegionRoleStateResponse::InvalidTransition(_))
165 }
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
169pub struct GrantedRegion {
170 pub region_id: RegionId,
171 pub region_role: RegionRole,
172 pub extensions: HashMap<String, Vec<u8>>,
173}
174
175impl GrantedRegion {
176 pub fn new(region_id: RegionId, region_role: RegionRole) -> Self {
177 Self {
178 region_id,
179 region_role,
180 extensions: HashMap::new(),
181 }
182 }
183}
184
185impl From<GrantedRegion> for PbGrantedRegion {
186 fn from(value: GrantedRegion) -> Self {
187 PbGrantedRegion {
188 region_id: value.region_id.as_u64(),
189 role: PbRegionRole::from(value.region_role).into(),
190 extensions: value.extensions,
191 }
192 }
193}
194
195impl From<PbGrantedRegion> for GrantedRegion {
196 fn from(value: PbGrantedRegion) -> Self {
197 GrantedRegion {
198 region_id: RegionId::from_u64(value.region_id),
199 region_role: value.role().into(),
200 extensions: value.extensions,
201 }
202 }
203}
204
205#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
208pub enum RegionRole {
209 Follower,
211 Leader,
213 DowngradingLeader,
217}
218
219impl Display for RegionRole {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 match self {
222 RegionRole::Follower => write!(f, "Follower"),
223 RegionRole::Leader => write!(f, "Leader"),
224 RegionRole::DowngradingLeader => write!(f, "Leader(Downgrading)"),
225 }
226 }
227}
228
229impl RegionRole {
230 pub fn writable(&self) -> bool {
231 matches!(self, RegionRole::Leader)
232 }
233}
234
235impl From<RegionRole> for PbRegionRole {
236 fn from(value: RegionRole) -> Self {
237 match value {
238 RegionRole::Follower => PbRegionRole::Follower,
239 RegionRole::Leader => PbRegionRole::Leader,
240 RegionRole::DowngradingLeader => PbRegionRole::DowngradingLeader,
241 }
242 }
243}
244
245impl From<PbRegionRole> for RegionRole {
246 fn from(value: PbRegionRole) -> Self {
247 match value {
248 PbRegionRole::Leader => RegionRole::Leader,
249 PbRegionRole::Follower => RegionRole::Follower,
250 PbRegionRole::DowngradingLeader => RegionRole::DowngradingLeader,
251 }
252 }
253}
254
255#[derive(Debug)]
257pub enum ScannerPartitioning {
258 Unknown(usize),
260}
261
262impl ScannerPartitioning {
263 pub fn num_partitions(&self) -> usize {
265 match self {
266 ScannerPartitioning::Unknown(num_partitions) => *num_partitions,
267 }
268 }
269}
270
271#[derive(Debug, Clone, Copy, PartialEq, Eq)]
273pub struct PartitionRange {
274 pub start: Timestamp,
276 pub end: Timestamp,
278 pub num_rows: usize,
280 pub identifier: usize,
282}
283
284#[derive(Debug, Default)]
286pub struct ScannerProperties {
287 pub partitions: Vec<Vec<PartitionRange>>,
292
293 append_mode: bool,
295
296 total_rows: usize,
299
300 pub distinguish_partition_range: bool,
302
303 target_partitions: usize,
305
306 logical_region: bool,
308}
309
310impl ScannerProperties {
311 pub fn with_append_mode(mut self, append_mode: bool) -> Self {
313 self.append_mode = append_mode;
314 self
315 }
316
317 pub fn with_total_rows(mut self, total_rows: usize) -> Self {
319 self.total_rows = total_rows;
320 self
321 }
322
323 pub fn new(partitions: Vec<Vec<PartitionRange>>, append_mode: bool, total_rows: usize) -> Self {
325 Self {
326 partitions,
327 append_mode,
328 total_rows,
329 distinguish_partition_range: false,
330 target_partitions: 0,
331 logical_region: false,
332 }
333 }
334
335 pub fn prepare(&mut self, request: PrepareRequest) {
337 if let Some(ranges) = request.ranges {
338 self.partitions = ranges;
339 }
340 if let Some(distinguish_partition_range) = request.distinguish_partition_range {
341 self.distinguish_partition_range = distinguish_partition_range;
342 }
343 if let Some(target_partitions) = request.target_partitions {
344 self.target_partitions = target_partitions;
345 }
346 }
347
348 pub fn num_partitions(&self) -> usize {
350 self.partitions.len()
351 }
352
353 pub fn append_mode(&self) -> bool {
354 self.append_mode
355 }
356
357 pub fn total_rows(&self) -> usize {
358 self.total_rows
359 }
360
361 pub fn is_logical_region(&self) -> bool {
363 self.logical_region
364 }
365
366 pub fn target_partitions(&self) -> usize {
368 if self.target_partitions == 0 {
369 self.num_partitions()
370 } else {
371 self.target_partitions
372 }
373 }
374
375 pub fn set_logical_region(&mut self, logical_region: bool) {
377 self.logical_region = logical_region;
378 }
379}
380
381#[derive(Default)]
383pub struct PrepareRequest {
384 pub ranges: Option<Vec<Vec<PartitionRange>>>,
386 pub distinguish_partition_range: Option<bool>,
388 pub target_partitions: Option<usize>,
390}
391
392impl PrepareRequest {
393 pub fn with_ranges(mut self, ranges: Vec<Vec<PartitionRange>>) -> Self {
395 self.ranges = Some(ranges);
396 self
397 }
398
399 pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self {
401 self.distinguish_partition_range = Some(distinguish_partition_range);
402 self
403 }
404
405 pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
407 self.target_partitions = Some(target_partitions);
408 self
409 }
410}
411
412#[derive(Clone, Default)]
414pub struct QueryScanContext {
415 pub explain_verbose: bool,
417}
418
419pub trait RegionScanner: Debug + DisplayAs + Send {
424 fn properties(&self) -> &ScannerProperties;
426
427 fn schema(&self) -> SchemaRef;
429
430 fn metadata(&self) -> RegionMetadataRef;
432
433 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>;
437
438 fn scan_partition(
443 &self,
444 ctx: &QueryScanContext,
445 metrics_set: &ExecutionPlanMetricsSet,
446 partition: usize,
447 ) -> Result<SendableRecordBatchStream, BoxedError>;
448
449 fn has_predicate_without_region(&self) -> bool;
451
452 fn set_logical_region(&mut self, logical_region: bool);
454}
455
456pub type RegionScannerRef = Box<dyn RegionScanner>;
457
458pub type BatchResponses = Vec<(RegionId, Result<RegionResponse, BoxedError>)>;
459
460#[derive(Debug, Deserialize, Serialize, Default)]
462pub struct RegionStatistic {
463 #[serde(default)]
465 pub num_rows: u64,
466 pub memtable_size: u64,
468 pub wal_size: u64,
470 pub manifest_size: u64,
472 pub sst_size: u64,
474 pub sst_num: u64,
476 #[serde(default)]
478 pub index_size: u64,
479 #[serde(default)]
481 pub manifest: RegionManifestInfo,
482 #[serde(default)]
483 pub written_bytes: u64,
485 #[serde(default)]
489 pub data_topic_latest_entry_id: u64,
490 #[serde(default)]
491 pub metadata_topic_latest_entry_id: u64,
492}
493
494#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
496pub enum RegionManifestInfo {
497 Mito {
498 manifest_version: u64,
499 flushed_entry_id: u64,
500 file_removed_cnt: u64,
502 },
503 Metric {
504 data_manifest_version: u64,
505 data_flushed_entry_id: u64,
506 metadata_manifest_version: u64,
507 metadata_flushed_entry_id: u64,
508 },
509}
510
511impl RegionManifestInfo {
512 pub fn mito(manifest_version: u64, flushed_entry_id: u64, file_removal_rate: u64) -> Self {
514 Self::Mito {
515 manifest_version,
516 flushed_entry_id,
517 file_removed_cnt: file_removal_rate,
518 }
519 }
520
521 pub fn metric(
523 data_manifest_version: u64,
524 data_flushed_entry_id: u64,
525 metadata_manifest_version: u64,
526 metadata_flushed_entry_id: u64,
527 ) -> Self {
528 Self::Metric {
529 data_manifest_version,
530 data_flushed_entry_id,
531 metadata_manifest_version,
532 metadata_flushed_entry_id,
533 }
534 }
535
536 pub fn is_mito(&self) -> bool {
538 matches!(self, RegionManifestInfo::Mito { .. })
539 }
540
541 pub fn is_metric(&self) -> bool {
543 matches!(self, RegionManifestInfo::Metric { .. })
544 }
545
546 pub fn data_flushed_entry_id(&self) -> u64 {
548 match self {
549 RegionManifestInfo::Mito {
550 flushed_entry_id, ..
551 } => *flushed_entry_id,
552 RegionManifestInfo::Metric {
553 data_flushed_entry_id,
554 ..
555 } => *data_flushed_entry_id,
556 }
557 }
558
559 pub fn data_manifest_version(&self) -> u64 {
561 match self {
562 RegionManifestInfo::Mito {
563 manifest_version, ..
564 } => *manifest_version,
565 RegionManifestInfo::Metric {
566 data_manifest_version,
567 ..
568 } => *data_manifest_version,
569 }
570 }
571
572 pub fn metadata_manifest_version(&self) -> Option<u64> {
574 match self {
575 RegionManifestInfo::Mito { .. } => None,
576 RegionManifestInfo::Metric {
577 metadata_manifest_version,
578 ..
579 } => Some(*metadata_manifest_version),
580 }
581 }
582
583 pub fn metadata_flushed_entry_id(&self) -> Option<u64> {
585 match self {
586 RegionManifestInfo::Mito { .. } => None,
587 RegionManifestInfo::Metric {
588 metadata_flushed_entry_id,
589 ..
590 } => Some(*metadata_flushed_entry_id),
591 }
592 }
593
594 pub fn encode_list(manifest_infos: &[(RegionId, Self)]) -> serde_json::Result<Vec<u8>> {
596 serde_json::to_vec(manifest_infos)
597 }
598
599 pub fn decode_list(value: &[u8]) -> serde_json::Result<Vec<(RegionId, Self)>> {
601 serde_json::from_slice(value)
602 }
603}
604
605impl Default for RegionManifestInfo {
606 fn default() -> Self {
607 Self::Mito {
608 manifest_version: 0,
609 flushed_entry_id: 0,
610 file_removed_cnt: 0,
611 }
612 }
613}
614
615impl RegionStatistic {
616 pub fn deserialize_from_slice(value: &[u8]) -> Option<RegionStatistic> {
620 serde_json::from_slice(value).ok()
621 }
622
623 pub fn serialize_to_vec(&self) -> Option<Vec<u8>> {
627 serde_json::to_vec(self).ok()
628 }
629}
630
631impl RegionStatistic {
632 pub fn estimated_disk_size(&self) -> u64 {
634 self.wal_size + self.sst_size + self.manifest_size + self.index_size
635 }
636}
637
638#[derive(Debug)]
640pub enum SyncManifestResponse {
641 NotSupported,
642 Mito {
643 synced: bool,
645 },
646 Metric {
647 metadata_synced: bool,
649 data_synced: bool,
651 new_opened_logical_region_ids: Vec<RegionId>,
654 },
655}
656
657impl SyncManifestResponse {
658 pub fn is_data_synced(&self) -> bool {
660 match self {
661 SyncManifestResponse::NotSupported => false,
662 SyncManifestResponse::Mito { synced } => *synced,
663 SyncManifestResponse::Metric { data_synced, .. } => *data_synced,
664 }
665 }
666
667 pub fn is_supported(&self) -> bool {
669 matches!(self, SyncManifestResponse::NotSupported)
670 }
671
672 pub fn is_mito(&self) -> bool {
674 matches!(self, SyncManifestResponse::Mito { .. })
675 }
676
677 pub fn is_metric(&self) -> bool {
679 matches!(self, SyncManifestResponse::Metric { .. })
680 }
681
682 pub fn new_opened_logical_region_ids(self) -> Option<Vec<RegionId>> {
684 match self {
685 SyncManifestResponse::Metric {
686 new_opened_logical_region_ids,
687 ..
688 } => Some(new_opened_logical_region_ids),
689 _ => None,
690 }
691 }
692}
693
694#[async_trait]
695pub trait RegionEngine: Send + Sync {
696 fn name(&self) -> &str;
698
699 async fn handle_batch_open_requests(
701 &self,
702 parallelism: usize,
703 requests: Vec<(RegionId, RegionOpenRequest)>,
704 ) -> Result<BatchResponses, BoxedError> {
705 let semaphore = Arc::new(Semaphore::new(parallelism));
706 let mut tasks = Vec::with_capacity(requests.len());
707
708 for (region_id, request) in requests {
709 let semaphore_moved = semaphore.clone();
710
711 tasks.push(async move {
712 let _permit = semaphore_moved.acquire().await.unwrap();
714 let result = self
715 .handle_request(region_id, RegionRequest::Open(request))
716 .await;
717 (region_id, result)
718 });
719 }
720
721 Ok(join_all(tasks).await)
722 }
723
724 async fn handle_batch_catchup_requests(
725 &self,
726 parallelism: usize,
727 requests: Vec<(RegionId, RegionCatchupRequest)>,
728 ) -> Result<BatchResponses, BoxedError> {
729 let semaphore = Arc::new(Semaphore::new(parallelism));
730 let mut tasks = Vec::with_capacity(requests.len());
731
732 for (region_id, request) in requests {
733 let semaphore_moved = semaphore.clone();
734
735 tasks.push(async move {
736 let _permit = semaphore_moved.acquire().await.unwrap();
738 let result = self
739 .handle_request(region_id, RegionRequest::Catchup(request))
740 .await;
741 (region_id, result)
742 });
743 }
744
745 Ok(join_all(tasks).await)
746 }
747
748 async fn handle_batch_ddl_requests(
749 &self,
750 request: BatchRegionDdlRequest,
751 ) -> Result<RegionResponse, BoxedError> {
752 let requests = request.into_region_requests();
753
754 let mut affected_rows = 0;
755 let mut extensions = HashMap::new();
756
757 for (region_id, request) in requests {
758 let result = self.handle_request(region_id, request).await?;
759 affected_rows += result.affected_rows;
760 extensions.extend(result.extensions);
761 }
762
763 Ok(RegionResponse {
764 affected_rows,
765 extensions,
766 metadata: Vec::new(),
767 })
768 }
769
770 async fn handle_request(
772 &self,
773 region_id: RegionId,
774 request: RegionRequest,
775 ) -> Result<RegionResponse, BoxedError>;
776
777 async fn get_committed_sequence(
779 &self,
780 region_id: RegionId,
781 ) -> Result<SequenceNumber, BoxedError>;
782
783 async fn handle_query(
785 &self,
786 region_id: RegionId,
787 request: ScanRequest,
788 ) -> Result<RegionScannerRef, BoxedError>;
789
790 fn register_query_memory_permit(&self) -> Option<Arc<MemoryPermit>> {
792 None
793 }
794
795 async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;
797
798 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic>;
800
801 async fn stop(&self) -> Result<(), BoxedError>;
803
804 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError>;
810
811 async fn sync_region(
813 &self,
814 region_id: RegionId,
815 manifest_info: RegionManifestInfo,
816 ) -> Result<SyncManifestResponse, BoxedError>;
817
818 async fn set_region_role_state_gracefully(
822 &self,
823 region_id: RegionId,
824 region_role_state: SettableRegionRoleState,
825 ) -> Result<SetRegionRoleStateResponse, BoxedError>;
826
827 fn role(&self, region_id: RegionId) -> Option<RegionRole>;
831
832 fn as_any(&self) -> &dyn Any;
833}
834
835pub type RegionEngineRef = Arc<dyn RegionEngine>;
836
837pub struct SinglePartitionScanner {
839 stream: Mutex<Option<SendableRecordBatchStream>>,
840 schema: SchemaRef,
841 properties: ScannerProperties,
842 metadata: RegionMetadataRef,
843}
844
845impl SinglePartitionScanner {
846 pub fn new(
848 stream: SendableRecordBatchStream,
849 append_mode: bool,
850 metadata: RegionMetadataRef,
851 ) -> Self {
852 let schema = stream.schema();
853 Self {
854 stream: Mutex::new(Some(stream)),
855 schema,
856 properties: ScannerProperties::default().with_append_mode(append_mode),
857 metadata,
858 }
859 }
860}
861
862impl Debug for SinglePartitionScanner {
863 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
864 write!(f, "SinglePartitionScanner: <SendableRecordBatchStream>")
865 }
866}
867
868impl RegionScanner for SinglePartitionScanner {
869 fn properties(&self) -> &ScannerProperties {
870 &self.properties
871 }
872
873 fn schema(&self) -> SchemaRef {
874 self.schema.clone()
875 }
876
877 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
878 self.properties.prepare(request);
879 Ok(())
880 }
881
882 fn scan_partition(
883 &self,
884 _ctx: &QueryScanContext,
885 _metrics_set: &ExecutionPlanMetricsSet,
886 _partition: usize,
887 ) -> Result<SendableRecordBatchStream, BoxedError> {
888 let mut stream = self.stream.lock().unwrap();
889 let result = stream
890 .take()
891 .or_else(|| Some(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))));
892 Ok(result.unwrap())
893 }
894
895 fn has_predicate_without_region(&self) -> bool {
896 false
897 }
898
899 fn metadata(&self) -> RegionMetadataRef {
900 self.metadata.clone()
901 }
902
903 fn set_logical_region(&mut self, logical_region: bool) {
904 self.properties.set_logical_region(logical_region);
905 }
906}
907
908impl DisplayAs for SinglePartitionScanner {
909 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
910 write!(f, "{:?}", self)
911 }
912}