1use std::any::Any;
18use std::collections::HashMap;
19use std::fmt::{Debug, Display};
20use std::sync::{Arc, Mutex};
21
22use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
23use api::region::RegionResponse;
24use async_trait::async_trait;
25use common_error::ext::BoxedError;
26use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
27use common_time::Timestamp;
28use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
29use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
30use datatypes::schema::SchemaRef;
31use futures::future::join_all;
32use serde::{Deserialize, Serialize};
33use tokio::sync::Semaphore;
34
35use crate::logstore::entry;
36use crate::metadata::RegionMetadataRef;
37use crate::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest};
38use crate::storage::{RegionId, ScanRequest, SequenceNumber};
39
40#[derive(Debug, PartialEq, Eq, Clone, Copy)]
42pub enum SettableRegionRoleState {
43 Follower,
44 DowngradingLeader,
45 Leader,
47 StagingLeader,
49}
50
51impl Display for SettableRegionRoleState {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 match self {
54 SettableRegionRoleState::Follower => write!(f, "Follower"),
55 SettableRegionRoleState::DowngradingLeader => write!(f, "Leader(Downgrading)"),
56 SettableRegionRoleState::Leader => write!(f, "Leader"),
57 SettableRegionRoleState::StagingLeader => write!(f, "Leader(Staging)"),
58 }
59 }
60}
61
62impl From<SettableRegionRoleState> for RegionRole {
63 fn from(value: SettableRegionRoleState) -> Self {
64 match value {
65 SettableRegionRoleState::Follower => RegionRole::Follower,
66 SettableRegionRoleState::DowngradingLeader => RegionRole::DowngradingLeader,
67 SettableRegionRoleState::Leader => RegionRole::Leader,
68 SettableRegionRoleState::StagingLeader => RegionRole::Leader, }
70 }
71}
72
73#[derive(Debug, PartialEq, Eq)]
75pub struct SetRegionRoleStateRequest {
76 region_id: RegionId,
77 region_role_state: SettableRegionRoleState,
78}
79
80#[derive(Debug, PartialEq, Eq)]
82pub enum SetRegionRoleStateSuccess {
83 File,
84 Mito {
85 last_entry_id: entry::Id,
86 },
87 Metric {
88 last_entry_id: entry::Id,
89 metadata_last_entry_id: entry::Id,
90 },
91}
92
93impl SetRegionRoleStateSuccess {
94 pub fn file() -> Self {
96 Self::File
97 }
98
99 pub fn mito(last_entry_id: entry::Id) -> Self {
101 SetRegionRoleStateSuccess::Mito { last_entry_id }
102 }
103
104 pub fn metric(last_entry_id: entry::Id, metadata_last_entry_id: entry::Id) -> Self {
106 SetRegionRoleStateSuccess::Metric {
107 last_entry_id,
108 metadata_last_entry_id,
109 }
110 }
111}
112
113impl SetRegionRoleStateSuccess {
114 pub fn last_entry_id(&self) -> Option<entry::Id> {
116 match self {
117 SetRegionRoleStateSuccess::File => None,
118 SetRegionRoleStateSuccess::Mito { last_entry_id } => Some(*last_entry_id),
119 SetRegionRoleStateSuccess::Metric { last_entry_id, .. } => Some(*last_entry_id),
120 }
121 }
122
123 pub fn metadata_last_entry_id(&self) -> Option<entry::Id> {
125 match self {
126 SetRegionRoleStateSuccess::File => None,
127 SetRegionRoleStateSuccess::Mito { .. } => None,
128 SetRegionRoleStateSuccess::Metric {
129 metadata_last_entry_id,
130 ..
131 } => Some(*metadata_last_entry_id),
132 }
133 }
134}
135
136#[derive(Debug)]
138pub enum SetRegionRoleStateResponse {
139 Success(SetRegionRoleStateSuccess),
140 NotFound,
141 InvalidTransition(BoxedError),
142}
143
144impl SetRegionRoleStateResponse {
145 pub fn success(success: SetRegionRoleStateSuccess) -> Self {
147 Self::Success(success)
148 }
149
150 pub fn invalid_transition(error: BoxedError) -> Self {
152 Self::InvalidTransition(error)
153 }
154
155 pub fn is_not_found(&self) -> bool {
157 matches!(self, SetRegionRoleStateResponse::NotFound)
158 }
159
160 pub fn is_invalid_transition(&self) -> bool {
162 matches!(self, SetRegionRoleStateResponse::InvalidTransition(_))
163 }
164}
165
166#[derive(Debug, Clone, PartialEq, Eq)]
167pub struct GrantedRegion {
168 pub region_id: RegionId,
169 pub region_role: RegionRole,
170 pub extensions: HashMap<String, Vec<u8>>,
171}
172
173impl GrantedRegion {
174 pub fn new(region_id: RegionId, region_role: RegionRole) -> Self {
175 Self {
176 region_id,
177 region_role,
178 extensions: HashMap::new(),
179 }
180 }
181}
182
183impl From<GrantedRegion> for PbGrantedRegion {
184 fn from(value: GrantedRegion) -> Self {
185 PbGrantedRegion {
186 region_id: value.region_id.as_u64(),
187 role: PbRegionRole::from(value.region_role).into(),
188 extensions: value.extensions,
189 }
190 }
191}
192
193impl From<PbGrantedRegion> for GrantedRegion {
194 fn from(value: PbGrantedRegion) -> Self {
195 GrantedRegion {
196 region_id: RegionId::from_u64(value.region_id),
197 region_role: value.role().into(),
198 extensions: value.extensions,
199 }
200 }
201}
202
203#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
206pub enum RegionRole {
207 Follower,
209 Leader,
211 DowngradingLeader,
215}
216
217impl Display for RegionRole {
218 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219 match self {
220 RegionRole::Follower => write!(f, "Follower"),
221 RegionRole::Leader => write!(f, "Leader"),
222 RegionRole::DowngradingLeader => write!(f, "Leader(Downgrading)"),
223 }
224 }
225}
226
227impl RegionRole {
228 pub fn writable(&self) -> bool {
229 matches!(self, RegionRole::Leader)
230 }
231}
232
233impl From<RegionRole> for PbRegionRole {
234 fn from(value: RegionRole) -> Self {
235 match value {
236 RegionRole::Follower => PbRegionRole::Follower,
237 RegionRole::Leader => PbRegionRole::Leader,
238 RegionRole::DowngradingLeader => PbRegionRole::DowngradingLeader,
239 }
240 }
241}
242
243impl From<PbRegionRole> for RegionRole {
244 fn from(value: PbRegionRole) -> Self {
245 match value {
246 PbRegionRole::Leader => RegionRole::Leader,
247 PbRegionRole::Follower => RegionRole::Follower,
248 PbRegionRole::DowngradingLeader => RegionRole::DowngradingLeader,
249 }
250 }
251}
252
253#[derive(Debug)]
255pub enum ScannerPartitioning {
256 Unknown(usize),
258}
259
260impl ScannerPartitioning {
261 pub fn num_partitions(&self) -> usize {
263 match self {
264 ScannerPartitioning::Unknown(num_partitions) => *num_partitions,
265 }
266 }
267}
268
269#[derive(Debug, Clone, Copy, PartialEq, Eq)]
271pub struct PartitionRange {
272 pub start: Timestamp,
274 pub end: Timestamp,
276 pub num_rows: usize,
278 pub identifier: usize,
280}
281
282#[derive(Debug, Default)]
284pub struct ScannerProperties {
285 pub partitions: Vec<Vec<PartitionRange>>,
290
291 append_mode: bool,
293
294 total_rows: usize,
297
298 pub distinguish_partition_range: bool,
300
301 target_partitions: usize,
303
304 logical_region: bool,
306}
307
308impl ScannerProperties {
309 pub fn with_append_mode(mut self, append_mode: bool) -> Self {
311 self.append_mode = append_mode;
312 self
313 }
314
315 pub fn with_total_rows(mut self, total_rows: usize) -> Self {
317 self.total_rows = total_rows;
318 self
319 }
320
321 pub fn new(partitions: Vec<Vec<PartitionRange>>, append_mode: bool, total_rows: usize) -> Self {
323 Self {
324 partitions,
325 append_mode,
326 total_rows,
327 distinguish_partition_range: false,
328 target_partitions: 0,
329 logical_region: false,
330 }
331 }
332
333 pub fn prepare(&mut self, request: PrepareRequest) {
335 if let Some(ranges) = request.ranges {
336 self.partitions = ranges;
337 }
338 if let Some(distinguish_partition_range) = request.distinguish_partition_range {
339 self.distinguish_partition_range = distinguish_partition_range;
340 }
341 if let Some(target_partitions) = request.target_partitions {
342 self.target_partitions = target_partitions;
343 }
344 }
345
346 pub fn num_partitions(&self) -> usize {
348 self.partitions.len()
349 }
350
351 pub fn append_mode(&self) -> bool {
352 self.append_mode
353 }
354
355 pub fn total_rows(&self) -> usize {
356 self.total_rows
357 }
358
359 pub fn is_logical_region(&self) -> bool {
361 self.logical_region
362 }
363
364 pub fn target_partitions(&self) -> usize {
366 if self.target_partitions == 0 {
367 self.num_partitions()
368 } else {
369 self.target_partitions
370 }
371 }
372
373 pub fn set_logical_region(&mut self, logical_region: bool) {
375 self.logical_region = logical_region;
376 }
377}
378
379#[derive(Default)]
381pub struct PrepareRequest {
382 pub ranges: Option<Vec<Vec<PartitionRange>>>,
384 pub distinguish_partition_range: Option<bool>,
386 pub target_partitions: Option<usize>,
388}
389
390impl PrepareRequest {
391 pub fn with_ranges(mut self, ranges: Vec<Vec<PartitionRange>>) -> Self {
393 self.ranges = Some(ranges);
394 self
395 }
396
397 pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self {
399 self.distinguish_partition_range = Some(distinguish_partition_range);
400 self
401 }
402
403 pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
405 self.target_partitions = Some(target_partitions);
406 self
407 }
408}
409
410#[derive(Clone, Default)]
412pub struct QueryScanContext {
413 pub explain_verbose: bool,
415}
416
417pub trait RegionScanner: Debug + DisplayAs + Send {
422 fn properties(&self) -> &ScannerProperties;
424
425 fn schema(&self) -> SchemaRef;
427
428 fn metadata(&self) -> RegionMetadataRef;
430
431 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>;
435
436 fn scan_partition(
441 &self,
442 ctx: &QueryScanContext,
443 metrics_set: &ExecutionPlanMetricsSet,
444 partition: usize,
445 ) -> Result<SendableRecordBatchStream, BoxedError>;
446
447 fn has_predicate(&self) -> bool;
449
450 fn set_logical_region(&mut self, logical_region: bool);
452}
453
454pub type RegionScannerRef = Box<dyn RegionScanner>;
455
456pub type BatchResponses = Vec<(RegionId, Result<RegionResponse, BoxedError>)>;
457
458#[derive(Debug, Deserialize, Serialize, Default)]
460pub struct RegionStatistic {
461 #[serde(default)]
463 pub num_rows: u64,
464 pub memtable_size: u64,
466 pub wal_size: u64,
468 pub manifest_size: u64,
470 pub sst_size: u64,
472 pub sst_num: u64,
474 #[serde(default)]
476 pub index_size: u64,
477 #[serde(default)]
479 pub manifest: RegionManifestInfo,
480 #[serde(default)]
481 pub written_bytes: u64,
483 #[serde(default)]
487 pub data_topic_latest_entry_id: u64,
488 #[serde(default)]
489 pub metadata_topic_latest_entry_id: u64,
490}
491
492#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
494pub enum RegionManifestInfo {
495 Mito {
496 manifest_version: u64,
497 flushed_entry_id: u64,
498 },
499 Metric {
500 data_manifest_version: u64,
501 data_flushed_entry_id: u64,
502 metadata_manifest_version: u64,
503 metadata_flushed_entry_id: u64,
504 },
505}
506
507impl RegionManifestInfo {
508 pub fn mito(manifest_version: u64, flushed_entry_id: u64) -> Self {
510 Self::Mito {
511 manifest_version,
512 flushed_entry_id,
513 }
514 }
515
516 pub fn metric(
518 data_manifest_version: u64,
519 data_flushed_entry_id: u64,
520 metadata_manifest_version: u64,
521 metadata_flushed_entry_id: u64,
522 ) -> Self {
523 Self::Metric {
524 data_manifest_version,
525 data_flushed_entry_id,
526 metadata_manifest_version,
527 metadata_flushed_entry_id,
528 }
529 }
530
531 pub fn is_mito(&self) -> bool {
533 matches!(self, RegionManifestInfo::Mito { .. })
534 }
535
536 pub fn is_metric(&self) -> bool {
538 matches!(self, RegionManifestInfo::Metric { .. })
539 }
540
541 pub fn data_flushed_entry_id(&self) -> u64 {
543 match self {
544 RegionManifestInfo::Mito {
545 flushed_entry_id, ..
546 } => *flushed_entry_id,
547 RegionManifestInfo::Metric {
548 data_flushed_entry_id,
549 ..
550 } => *data_flushed_entry_id,
551 }
552 }
553
554 pub fn data_manifest_version(&self) -> u64 {
556 match self {
557 RegionManifestInfo::Mito {
558 manifest_version, ..
559 } => *manifest_version,
560 RegionManifestInfo::Metric {
561 data_manifest_version,
562 ..
563 } => *data_manifest_version,
564 }
565 }
566
567 pub fn metadata_manifest_version(&self) -> Option<u64> {
569 match self {
570 RegionManifestInfo::Mito { .. } => None,
571 RegionManifestInfo::Metric {
572 metadata_manifest_version,
573 ..
574 } => Some(*metadata_manifest_version),
575 }
576 }
577
578 pub fn metadata_flushed_entry_id(&self) -> Option<u64> {
580 match self {
581 RegionManifestInfo::Mito { .. } => None,
582 RegionManifestInfo::Metric {
583 metadata_flushed_entry_id,
584 ..
585 } => Some(*metadata_flushed_entry_id),
586 }
587 }
588
589 pub fn encode_list(manifest_infos: &[(RegionId, Self)]) -> serde_json::Result<Vec<u8>> {
591 serde_json::to_vec(manifest_infos)
592 }
593
594 pub fn decode_list(value: &[u8]) -> serde_json::Result<Vec<(RegionId, Self)>> {
596 serde_json::from_slice(value)
597 }
598}
599
600impl Default for RegionManifestInfo {
601 fn default() -> Self {
602 Self::Mito {
603 manifest_version: 0,
604 flushed_entry_id: 0,
605 }
606 }
607}
608
609impl RegionStatistic {
610 pub fn deserialize_from_slice(value: &[u8]) -> Option<RegionStatistic> {
614 serde_json::from_slice(value).ok()
615 }
616
617 pub fn serialize_to_vec(&self) -> Option<Vec<u8>> {
621 serde_json::to_vec(self).ok()
622 }
623}
624
625impl RegionStatistic {
626 pub fn estimated_disk_size(&self) -> u64 {
628 self.wal_size + self.sst_size + self.manifest_size + self.index_size
629 }
630}
631
632#[derive(Debug)]
634pub enum SyncManifestResponse {
635 NotSupported,
636 Mito {
637 synced: bool,
639 },
640 Metric {
641 metadata_synced: bool,
643 data_synced: bool,
645 new_opened_logical_region_ids: Vec<RegionId>,
648 },
649}
650
651impl SyncManifestResponse {
652 pub fn is_data_synced(&self) -> bool {
654 match self {
655 SyncManifestResponse::NotSupported => false,
656 SyncManifestResponse::Mito { synced } => *synced,
657 SyncManifestResponse::Metric { data_synced, .. } => *data_synced,
658 }
659 }
660
661 pub fn is_supported(&self) -> bool {
663 matches!(self, SyncManifestResponse::NotSupported)
664 }
665
666 pub fn is_mito(&self) -> bool {
668 matches!(self, SyncManifestResponse::Mito { .. })
669 }
670
671 pub fn is_metric(&self) -> bool {
673 matches!(self, SyncManifestResponse::Metric { .. })
674 }
675
676 pub fn new_opened_logical_region_ids(self) -> Option<Vec<RegionId>> {
678 match self {
679 SyncManifestResponse::Metric {
680 new_opened_logical_region_ids,
681 ..
682 } => Some(new_opened_logical_region_ids),
683 _ => None,
684 }
685 }
686}
687
688#[async_trait]
689pub trait RegionEngine: Send + Sync {
690 fn name(&self) -> &str;
692
693 async fn handle_batch_open_requests(
695 &self,
696 parallelism: usize,
697 requests: Vec<(RegionId, RegionOpenRequest)>,
698 ) -> Result<BatchResponses, BoxedError> {
699 let semaphore = Arc::new(Semaphore::new(parallelism));
700 let mut tasks = Vec::with_capacity(requests.len());
701
702 for (region_id, request) in requests {
703 let semaphore_moved = semaphore.clone();
704
705 tasks.push(async move {
706 let _permit = semaphore_moved.acquire().await.unwrap();
708 let result = self
709 .handle_request(region_id, RegionRequest::Open(request))
710 .await;
711 (region_id, result)
712 });
713 }
714
715 Ok(join_all(tasks).await)
716 }
717
718 async fn handle_batch_ddl_requests(
719 &self,
720 request: BatchRegionDdlRequest,
721 ) -> Result<RegionResponse, BoxedError> {
722 let requests = request.into_region_requests();
723
724 let mut affected_rows = 0;
725 let mut extensions = HashMap::new();
726
727 for (region_id, request) in requests {
728 let result = self.handle_request(region_id, request).await?;
729 affected_rows += result.affected_rows;
730 extensions.extend(result.extensions);
731 }
732
733 Ok(RegionResponse {
734 affected_rows,
735 extensions,
736 metadata: Vec::new(),
737 })
738 }
739
740 async fn handle_request(
742 &self,
743 region_id: RegionId,
744 request: RegionRequest,
745 ) -> Result<RegionResponse, BoxedError>;
746
747 async fn get_committed_sequence(
749 &self,
750 region_id: RegionId,
751 ) -> Result<SequenceNumber, BoxedError>;
752
753 async fn handle_query(
755 &self,
756 region_id: RegionId,
757 request: ScanRequest,
758 ) -> Result<RegionScannerRef, BoxedError>;
759
760 async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;
762
763 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic>;
765
766 async fn stop(&self) -> Result<(), BoxedError>;
768
769 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError>;
775
776 async fn sync_region(
778 &self,
779 region_id: RegionId,
780 manifest_info: RegionManifestInfo,
781 ) -> Result<SyncManifestResponse, BoxedError>;
782
783 async fn set_region_role_state_gracefully(
787 &self,
788 region_id: RegionId,
789 region_role_state: SettableRegionRoleState,
790 ) -> Result<SetRegionRoleStateResponse, BoxedError>;
791
792 fn role(&self, region_id: RegionId) -> Option<RegionRole>;
796
797 fn as_any(&self) -> &dyn Any;
798}
799
800pub type RegionEngineRef = Arc<dyn RegionEngine>;
801
802pub struct SinglePartitionScanner {
804 stream: Mutex<Option<SendableRecordBatchStream>>,
805 schema: SchemaRef,
806 properties: ScannerProperties,
807 metadata: RegionMetadataRef,
808}
809
810impl SinglePartitionScanner {
811 pub fn new(
813 stream: SendableRecordBatchStream,
814 append_mode: bool,
815 metadata: RegionMetadataRef,
816 ) -> Self {
817 let schema = stream.schema();
818 Self {
819 stream: Mutex::new(Some(stream)),
820 schema,
821 properties: ScannerProperties::default().with_append_mode(append_mode),
822 metadata,
823 }
824 }
825}
826
827impl Debug for SinglePartitionScanner {
828 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
829 write!(f, "SinglePartitionScanner: <SendableRecordBatchStream>")
830 }
831}
832
833impl RegionScanner for SinglePartitionScanner {
834 fn properties(&self) -> &ScannerProperties {
835 &self.properties
836 }
837
838 fn schema(&self) -> SchemaRef {
839 self.schema.clone()
840 }
841
842 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
843 self.properties.prepare(request);
844 Ok(())
845 }
846
847 fn scan_partition(
848 &self,
849 _ctx: &QueryScanContext,
850 _metrics_set: &ExecutionPlanMetricsSet,
851 _partition: usize,
852 ) -> Result<SendableRecordBatchStream, BoxedError> {
853 let mut stream = self.stream.lock().unwrap();
854 let result = stream
855 .take()
856 .or_else(|| Some(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))));
857 Ok(result.unwrap())
858 }
859
860 fn has_predicate(&self) -> bool {
861 false
862 }
863
864 fn metadata(&self) -> RegionMetadataRef {
865 self.metadata.clone()
866 }
867
868 fn set_logical_region(&mut self, logical_region: bool) {
869 self.properties.set_logical_region(logical_region);
870 }
871}
872
873impl DisplayAs for SinglePartitionScanner {
874 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
875 write!(f, "{:?}", self)
876 }
877}