1use std::any::Any;
18use std::collections::HashMap;
19use std::fmt::{Debug, Display};
20use std::sync::{Arc, Mutex};
21
22use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
23use api::region::RegionResponse;
24use async_trait::async_trait;
25use common_error::ext::BoxedError;
26use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
27use common_time::Timestamp;
28use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
29use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
30use datatypes::schema::SchemaRef;
31use futures::future::join_all;
32use serde::{Deserialize, Serialize};
33use tokio::sync::Semaphore;
34
35use crate::logstore::entry;
36use crate::metadata::RegionMetadataRef;
37use crate::region_request::{
38 BatchRegionDdlRequest, RegionOpenRequest, RegionRequest, RegionSequencesRequest,
39};
40use crate::storage::{RegionId, ScanRequest, SequenceNumber};
41
42#[derive(Debug, PartialEq, Eq, Clone, Copy)]
44pub enum SettableRegionRoleState {
45 Follower,
46 DowngradingLeader,
47}
48
49impl Display for SettableRegionRoleState {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 match self {
52 SettableRegionRoleState::Follower => write!(f, "Follower"),
53 SettableRegionRoleState::DowngradingLeader => write!(f, "Leader(Downgrading)"),
54 }
55 }
56}
57
58impl From<SettableRegionRoleState> for RegionRole {
59 fn from(value: SettableRegionRoleState) -> Self {
60 match value {
61 SettableRegionRoleState::Follower => RegionRole::Follower,
62 SettableRegionRoleState::DowngradingLeader => RegionRole::DowngradingLeader,
63 }
64 }
65}
66
67#[derive(Debug, PartialEq, Eq)]
69pub struct SetRegionRoleStateRequest {
70 region_id: RegionId,
71 region_role_state: SettableRegionRoleState,
72}
73
74#[derive(Debug, PartialEq, Eq)]
76pub enum SetRegionRoleStateSuccess {
77 File,
78 Mito {
79 last_entry_id: entry::Id,
80 },
81 Metric {
82 last_entry_id: entry::Id,
83 metadata_last_entry_id: entry::Id,
84 },
85}
86
87impl SetRegionRoleStateSuccess {
88 pub fn file() -> Self {
90 Self::File
91 }
92
93 pub fn mito(last_entry_id: entry::Id) -> Self {
95 SetRegionRoleStateSuccess::Mito { last_entry_id }
96 }
97
98 pub fn metric(last_entry_id: entry::Id, metadata_last_entry_id: entry::Id) -> Self {
100 SetRegionRoleStateSuccess::Metric {
101 last_entry_id,
102 metadata_last_entry_id,
103 }
104 }
105}
106
107impl SetRegionRoleStateSuccess {
108 pub fn last_entry_id(&self) -> Option<entry::Id> {
110 match self {
111 SetRegionRoleStateSuccess::File => None,
112 SetRegionRoleStateSuccess::Mito { last_entry_id } => Some(*last_entry_id),
113 SetRegionRoleStateSuccess::Metric { last_entry_id, .. } => Some(*last_entry_id),
114 }
115 }
116
117 pub fn metadata_last_entry_id(&self) -> Option<entry::Id> {
119 match self {
120 SetRegionRoleStateSuccess::File => None,
121 SetRegionRoleStateSuccess::Mito { .. } => None,
122 SetRegionRoleStateSuccess::Metric {
123 metadata_last_entry_id,
124 ..
125 } => Some(*metadata_last_entry_id),
126 }
127 }
128}
129
130#[derive(Debug, PartialEq, Eq)]
132pub enum SetRegionRoleStateResponse {
133 Success(SetRegionRoleStateSuccess),
134 NotFound,
135}
136
137impl SetRegionRoleStateResponse {
138 pub fn success(success: SetRegionRoleStateSuccess) -> Self {
140 Self::Success(success)
141 }
142
143 pub fn is_not_found(&self) -> bool {
145 matches!(self, SetRegionRoleStateResponse::NotFound)
146 }
147}
148
149#[derive(Debug, Clone, PartialEq, Eq)]
150pub struct GrantedRegion {
151 pub region_id: RegionId,
152 pub region_role: RegionRole,
153 pub extensions: HashMap<String, Vec<u8>>,
154}
155
156impl GrantedRegion {
157 pub fn new(region_id: RegionId, region_role: RegionRole) -> Self {
158 Self {
159 region_id,
160 region_role,
161 extensions: HashMap::new(),
162 }
163 }
164}
165
166impl From<GrantedRegion> for PbGrantedRegion {
167 fn from(value: GrantedRegion) -> Self {
168 PbGrantedRegion {
169 region_id: value.region_id.as_u64(),
170 role: PbRegionRole::from(value.region_role).into(),
171 extensions: value.extensions,
172 }
173 }
174}
175
176impl From<PbGrantedRegion> for GrantedRegion {
177 fn from(value: PbGrantedRegion) -> Self {
178 GrantedRegion {
179 region_id: RegionId::from_u64(value.region_id),
180 region_role: value.role().into(),
181 extensions: value.extensions,
182 }
183 }
184}
185
186#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
189pub enum RegionRole {
190 Follower,
192 Leader,
194 DowngradingLeader,
198}
199
200impl Display for RegionRole {
201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202 match self {
203 RegionRole::Follower => write!(f, "Follower"),
204 RegionRole::Leader => write!(f, "Leader"),
205 RegionRole::DowngradingLeader => write!(f, "Leader(Downgrading)"),
206 }
207 }
208}
209
210impl RegionRole {
211 pub fn writable(&self) -> bool {
212 matches!(self, RegionRole::Leader)
213 }
214}
215
216impl From<RegionRole> for PbRegionRole {
217 fn from(value: RegionRole) -> Self {
218 match value {
219 RegionRole::Follower => PbRegionRole::Follower,
220 RegionRole::Leader => PbRegionRole::Leader,
221 RegionRole::DowngradingLeader => PbRegionRole::DowngradingLeader,
222 }
223 }
224}
225
226impl From<PbRegionRole> for RegionRole {
227 fn from(value: PbRegionRole) -> Self {
228 match value {
229 PbRegionRole::Leader => RegionRole::Leader,
230 PbRegionRole::Follower => RegionRole::Follower,
231 PbRegionRole::DowngradingLeader => RegionRole::DowngradingLeader,
232 }
233 }
234}
235
236#[derive(Debug)]
238pub enum ScannerPartitioning {
239 Unknown(usize),
241}
242
243impl ScannerPartitioning {
244 pub fn num_partitions(&self) -> usize {
246 match self {
247 ScannerPartitioning::Unknown(num_partitions) => *num_partitions,
248 }
249 }
250}
251
252#[derive(Debug, Clone, Copy, PartialEq, Eq)]
254pub struct PartitionRange {
255 pub start: Timestamp,
257 pub end: Timestamp,
259 pub num_rows: usize,
261 pub identifier: usize,
263}
264
265#[derive(Debug, Default)]
267pub struct ScannerProperties {
268 pub partitions: Vec<Vec<PartitionRange>>,
273
274 append_mode: bool,
276
277 total_rows: usize,
280
281 pub distinguish_partition_range: bool,
283
284 target_partitions: usize,
286
287 logical_region: bool,
289}
290
291impl ScannerProperties {
292 pub fn with_append_mode(mut self, append_mode: bool) -> Self {
294 self.append_mode = append_mode;
295 self
296 }
297
298 pub fn with_total_rows(mut self, total_rows: usize) -> Self {
300 self.total_rows = total_rows;
301 self
302 }
303
304 pub fn new(partitions: Vec<Vec<PartitionRange>>, append_mode: bool, total_rows: usize) -> Self {
306 Self {
307 partitions,
308 append_mode,
309 total_rows,
310 distinguish_partition_range: false,
311 target_partitions: 0,
312 logical_region: false,
313 }
314 }
315
316 pub fn prepare(&mut self, request: PrepareRequest) {
318 if let Some(ranges) = request.ranges {
319 self.partitions = ranges;
320 }
321 if let Some(distinguish_partition_range) = request.distinguish_partition_range {
322 self.distinguish_partition_range = distinguish_partition_range;
323 }
324 if let Some(target_partitions) = request.target_partitions {
325 self.target_partitions = target_partitions;
326 }
327 }
328
329 pub fn num_partitions(&self) -> usize {
331 self.partitions.len()
332 }
333
334 pub fn append_mode(&self) -> bool {
335 self.append_mode
336 }
337
338 pub fn total_rows(&self) -> usize {
339 self.total_rows
340 }
341
342 pub fn is_logical_region(&self) -> bool {
344 self.logical_region
345 }
346
347 pub fn target_partitions(&self) -> usize {
349 if self.target_partitions == 0 {
350 self.num_partitions()
351 } else {
352 self.target_partitions
353 }
354 }
355
356 pub fn set_logical_region(&mut self, logical_region: bool) {
358 self.logical_region = logical_region;
359 }
360}
361
362#[derive(Default)]
364pub struct PrepareRequest {
365 pub ranges: Option<Vec<Vec<PartitionRange>>>,
367 pub distinguish_partition_range: Option<bool>,
369 pub target_partitions: Option<usize>,
371}
372
373impl PrepareRequest {
374 pub fn with_ranges(mut self, ranges: Vec<Vec<PartitionRange>>) -> Self {
376 self.ranges = Some(ranges);
377 self
378 }
379
380 pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self {
382 self.distinguish_partition_range = Some(distinguish_partition_range);
383 self
384 }
385
386 pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
388 self.target_partitions = Some(target_partitions);
389 self
390 }
391}
392
393#[derive(Clone, Default)]
395pub struct QueryScanContext {
396 pub explain_verbose: bool,
398}
399
400pub trait RegionScanner: Debug + DisplayAs + Send {
405 fn properties(&self) -> &ScannerProperties;
407
408 fn schema(&self) -> SchemaRef;
410
411 fn metadata(&self) -> RegionMetadataRef;
413
414 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>;
418
419 fn scan_partition(
424 &self,
425 ctx: &QueryScanContext,
426 metrics_set: &ExecutionPlanMetricsSet,
427 partition: usize,
428 ) -> Result<SendableRecordBatchStream, BoxedError>;
429
430 fn has_predicate(&self) -> bool;
432
433 fn set_logical_region(&mut self, logical_region: bool);
435}
436
437pub type RegionScannerRef = Box<dyn RegionScanner>;
438
439pub type BatchResponses = Vec<(RegionId, Result<RegionResponse, BoxedError>)>;
440
441#[derive(Debug, Deserialize, Serialize, Default)]
443pub struct RegionStatistic {
444 #[serde(default)]
446 pub num_rows: u64,
447 pub memtable_size: u64,
449 pub wal_size: u64,
451 pub manifest_size: u64,
453 pub sst_size: u64,
455 pub sst_num: u64,
457 #[serde(default)]
459 pub index_size: u64,
460 #[serde(default)]
462 pub manifest: RegionManifestInfo,
463 #[serde(default)]
467 pub data_topic_latest_entry_id: u64,
468 #[serde(default)]
469 pub metadata_topic_latest_entry_id: u64,
470}
471
472#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
474pub enum RegionManifestInfo {
475 Mito {
476 manifest_version: u64,
477 flushed_entry_id: u64,
478 },
479 Metric {
480 data_manifest_version: u64,
481 data_flushed_entry_id: u64,
482 metadata_manifest_version: u64,
483 metadata_flushed_entry_id: u64,
484 },
485}
486
487impl RegionManifestInfo {
488 pub fn mito(manifest_version: u64, flushed_entry_id: u64) -> Self {
490 Self::Mito {
491 manifest_version,
492 flushed_entry_id,
493 }
494 }
495
496 pub fn metric(
498 data_manifest_version: u64,
499 data_flushed_entry_id: u64,
500 metadata_manifest_version: u64,
501 metadata_flushed_entry_id: u64,
502 ) -> Self {
503 Self::Metric {
504 data_manifest_version,
505 data_flushed_entry_id,
506 metadata_manifest_version,
507 metadata_flushed_entry_id,
508 }
509 }
510
511 pub fn is_mito(&self) -> bool {
513 matches!(self, RegionManifestInfo::Mito { .. })
514 }
515
516 pub fn is_metric(&self) -> bool {
518 matches!(self, RegionManifestInfo::Metric { .. })
519 }
520
521 pub fn data_flushed_entry_id(&self) -> u64 {
523 match self {
524 RegionManifestInfo::Mito {
525 flushed_entry_id, ..
526 } => *flushed_entry_id,
527 RegionManifestInfo::Metric {
528 data_flushed_entry_id,
529 ..
530 } => *data_flushed_entry_id,
531 }
532 }
533
534 pub fn data_manifest_version(&self) -> u64 {
536 match self {
537 RegionManifestInfo::Mito {
538 manifest_version, ..
539 } => *manifest_version,
540 RegionManifestInfo::Metric {
541 data_manifest_version,
542 ..
543 } => *data_manifest_version,
544 }
545 }
546
547 pub fn metadata_manifest_version(&self) -> Option<u64> {
549 match self {
550 RegionManifestInfo::Mito { .. } => None,
551 RegionManifestInfo::Metric {
552 metadata_manifest_version,
553 ..
554 } => Some(*metadata_manifest_version),
555 }
556 }
557
558 pub fn metadata_flushed_entry_id(&self) -> Option<u64> {
560 match self {
561 RegionManifestInfo::Mito { .. } => None,
562 RegionManifestInfo::Metric {
563 metadata_flushed_entry_id,
564 ..
565 } => Some(*metadata_flushed_entry_id),
566 }
567 }
568
569 pub fn encode_list(manifest_infos: &[(RegionId, Self)]) -> serde_json::Result<Vec<u8>> {
571 serde_json::to_vec(manifest_infos)
572 }
573
574 pub fn decode_list(value: &[u8]) -> serde_json::Result<Vec<(RegionId, Self)>> {
576 serde_json::from_slice(value)
577 }
578}
579
580impl Default for RegionManifestInfo {
581 fn default() -> Self {
582 Self::Mito {
583 manifest_version: 0,
584 flushed_entry_id: 0,
585 }
586 }
587}
588
589impl RegionStatistic {
590 pub fn deserialize_from_slice(value: &[u8]) -> Option<RegionStatistic> {
594 serde_json::from_slice(value).ok()
595 }
596
597 pub fn serialize_to_vec(&self) -> Option<Vec<u8>> {
601 serde_json::to_vec(self).ok()
602 }
603}
604
605impl RegionStatistic {
606 pub fn estimated_disk_size(&self) -> u64 {
608 self.wal_size + self.sst_size + self.manifest_size + self.index_size
609 }
610}
611
612#[derive(Debug)]
614pub enum SyncManifestResponse {
615 NotSupported,
616 Mito {
617 synced: bool,
619 },
620 Metric {
621 metadata_synced: bool,
623 data_synced: bool,
625 new_opened_logical_region_ids: Vec<RegionId>,
628 },
629}
630
631impl SyncManifestResponse {
632 pub fn is_data_synced(&self) -> bool {
634 match self {
635 SyncManifestResponse::NotSupported => false,
636 SyncManifestResponse::Mito { synced } => *synced,
637 SyncManifestResponse::Metric { data_synced, .. } => *data_synced,
638 }
639 }
640
641 pub fn is_supported(&self) -> bool {
643 matches!(self, SyncManifestResponse::NotSupported)
644 }
645
646 pub fn is_mito(&self) -> bool {
648 matches!(self, SyncManifestResponse::Mito { .. })
649 }
650
651 pub fn is_metric(&self) -> bool {
653 matches!(self, SyncManifestResponse::Metric { .. })
654 }
655
656 pub fn new_opened_logical_region_ids(self) -> Option<Vec<RegionId>> {
658 match self {
659 SyncManifestResponse::Metric {
660 new_opened_logical_region_ids,
661 ..
662 } => Some(new_opened_logical_region_ids),
663 _ => None,
664 }
665 }
666}
667
668#[async_trait]
669pub trait RegionEngine: Send + Sync {
670 fn name(&self) -> &str;
672
673 async fn handle_batch_open_requests(
675 &self,
676 parallelism: usize,
677 requests: Vec<(RegionId, RegionOpenRequest)>,
678 ) -> Result<BatchResponses, BoxedError> {
679 let semaphore = Arc::new(Semaphore::new(parallelism));
680 let mut tasks = Vec::with_capacity(requests.len());
681
682 for (region_id, request) in requests {
683 let semaphore_moved = semaphore.clone();
684
685 tasks.push(async move {
686 let _permit = semaphore_moved.acquire().await.unwrap();
688 let result = self
689 .handle_request(region_id, RegionRequest::Open(request))
690 .await;
691 (region_id, result)
692 });
693 }
694
695 Ok(join_all(tasks).await)
696 }
697
698 async fn handle_batch_ddl_requests(
699 &self,
700 request: BatchRegionDdlRequest,
701 ) -> Result<RegionResponse, BoxedError> {
702 let requests = request.into_region_requests();
703
704 let mut affected_rows = 0;
705 let mut extensions = HashMap::new();
706
707 for (region_id, request) in requests {
708 let result = self.handle_request(region_id, request).await?;
709 affected_rows += result.affected_rows;
710 extensions.extend(result.extensions);
711 }
712
713 Ok(RegionResponse {
714 affected_rows,
715 extensions,
716 metadata: Vec::new(),
717 })
718 }
719
720 async fn handle_request(
722 &self,
723 region_id: RegionId,
724 request: RegionRequest,
725 ) -> Result<RegionResponse, BoxedError>;
726
727 async fn get_last_seq_num(
729 &self,
730 region_id: RegionId,
731 ) -> Result<Option<SequenceNumber>, BoxedError>;
732
733 async fn get_region_sequences(
734 &self,
735 seqs: RegionSequencesRequest,
736 ) -> Result<HashMap<u64, u64>, BoxedError> {
737 let mut results = HashMap::with_capacity(seqs.region_ids.len());
738
739 for region_id in seqs.region_ids {
740 let seq = self.get_last_seq_num(region_id).await?.unwrap_or_default();
741 results.insert(region_id.as_u64(), seq);
742 }
743
744 Ok(results)
745 }
746
747 async fn handle_query(
749 &self,
750 region_id: RegionId,
751 request: ScanRequest,
752 ) -> Result<RegionScannerRef, BoxedError>;
753
754 async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;
756
757 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic>;
759
760 async fn stop(&self) -> Result<(), BoxedError>;
762
763 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError>;
769
770 async fn sync_region(
772 &self,
773 region_id: RegionId,
774 manifest_info: RegionManifestInfo,
775 ) -> Result<SyncManifestResponse, BoxedError>;
776
777 async fn set_region_role_state_gracefully(
781 &self,
782 region_id: RegionId,
783 region_role_state: SettableRegionRoleState,
784 ) -> Result<SetRegionRoleStateResponse, BoxedError>;
785
786 fn role(&self, region_id: RegionId) -> Option<RegionRole>;
790
791 fn as_any(&self) -> &dyn Any;
792}
793
794pub type RegionEngineRef = Arc<dyn RegionEngine>;
795
796pub struct SinglePartitionScanner {
798 stream: Mutex<Option<SendableRecordBatchStream>>,
799 schema: SchemaRef,
800 properties: ScannerProperties,
801 metadata: RegionMetadataRef,
802}
803
804impl SinglePartitionScanner {
805 pub fn new(
807 stream: SendableRecordBatchStream,
808 append_mode: bool,
809 metadata: RegionMetadataRef,
810 ) -> Self {
811 let schema = stream.schema();
812 Self {
813 stream: Mutex::new(Some(stream)),
814 schema,
815 properties: ScannerProperties::default().with_append_mode(append_mode),
816 metadata,
817 }
818 }
819}
820
821impl Debug for SinglePartitionScanner {
822 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
823 write!(f, "SinglePartitionScanner: <SendableRecordBatchStream>")
824 }
825}
826
827impl RegionScanner for SinglePartitionScanner {
828 fn properties(&self) -> &ScannerProperties {
829 &self.properties
830 }
831
832 fn schema(&self) -> SchemaRef {
833 self.schema.clone()
834 }
835
836 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
837 self.properties.prepare(request);
838 Ok(())
839 }
840
841 fn scan_partition(
842 &self,
843 _ctx: &QueryScanContext,
844 _metrics_set: &ExecutionPlanMetricsSet,
845 _partition: usize,
846 ) -> Result<SendableRecordBatchStream, BoxedError> {
847 let mut stream = self.stream.lock().unwrap();
848 let result = stream
849 .take()
850 .or_else(|| Some(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))));
851 Ok(result.unwrap())
852 }
853
854 fn has_predicate(&self) -> bool {
855 false
856 }
857
858 fn metadata(&self) -> RegionMetadataRef {
859 self.metadata.clone()
860 }
861
862 fn set_logical_region(&mut self, logical_region: bool) {
863 self.properties.set_logical_region(logical_region);
864 }
865}
866
867impl DisplayAs for SinglePartitionScanner {
868 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
869 write!(f, "{:?}", self)
870 }
871}