1use std::any::Any;
18use std::collections::HashMap;
19use std::fmt::{Debug, Display};
20use std::sync::{Arc, Mutex};
21
22use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
23use api::region::RegionResponse;
24use async_trait::async_trait;
25use common_error::ext::{BoxedError, PlainError};
26use common_error::status_code::StatusCode;
27use common_recordbatch::SendableRecordBatchStream;
28use common_time::Timestamp;
29use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
30use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
31use datatypes::schema::SchemaRef;
32use futures::future::join_all;
33use serde::{Deserialize, Serialize};
34use tokio::sync::Semaphore;
35
36use crate::logstore::entry;
37use crate::metadata::RegionMetadataRef;
38use crate::region_request::{
39 BatchRegionDdlRequest, RegionOpenRequest, RegionRequest, RegionSequencesRequest,
40};
41use crate::storage::{RegionId, ScanRequest, SequenceNumber};
42
43#[derive(Debug, PartialEq, Eq, Clone, Copy)]
45pub enum SettableRegionRoleState {
46 Follower,
47 DowngradingLeader,
48}
49
50impl Display for SettableRegionRoleState {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 match self {
53 SettableRegionRoleState::Follower => write!(f, "Follower"),
54 SettableRegionRoleState::DowngradingLeader => write!(f, "Leader(Downgrading)"),
55 }
56 }
57}
58
59impl From<SettableRegionRoleState> for RegionRole {
60 fn from(value: SettableRegionRoleState) -> Self {
61 match value {
62 SettableRegionRoleState::Follower => RegionRole::Follower,
63 SettableRegionRoleState::DowngradingLeader => RegionRole::DowngradingLeader,
64 }
65 }
66}
67
68#[derive(Debug, PartialEq, Eq)]
70pub struct SetRegionRoleStateRequest {
71 region_id: RegionId,
72 region_role_state: SettableRegionRoleState,
73}
74
75#[derive(Debug, PartialEq, Eq)]
77pub enum SetRegionRoleStateSuccess {
78 File,
79 Mito {
80 last_entry_id: entry::Id,
81 },
82 Metric {
83 last_entry_id: entry::Id,
84 metadata_last_entry_id: entry::Id,
85 },
86}
87
88impl SetRegionRoleStateSuccess {
89 pub fn file() -> Self {
91 Self::File
92 }
93
94 pub fn mito(last_entry_id: entry::Id) -> Self {
96 SetRegionRoleStateSuccess::Mito { last_entry_id }
97 }
98
99 pub fn metric(last_entry_id: entry::Id, metadata_last_entry_id: entry::Id) -> Self {
101 SetRegionRoleStateSuccess::Metric {
102 last_entry_id,
103 metadata_last_entry_id,
104 }
105 }
106}
107
108impl SetRegionRoleStateSuccess {
109 pub fn last_entry_id(&self) -> Option<entry::Id> {
111 match self {
112 SetRegionRoleStateSuccess::File => None,
113 SetRegionRoleStateSuccess::Mito { last_entry_id } => Some(*last_entry_id),
114 SetRegionRoleStateSuccess::Metric { last_entry_id, .. } => Some(*last_entry_id),
115 }
116 }
117
118 pub fn metadata_last_entry_id(&self) -> Option<entry::Id> {
120 match self {
121 SetRegionRoleStateSuccess::File => None,
122 SetRegionRoleStateSuccess::Mito { .. } => None,
123 SetRegionRoleStateSuccess::Metric {
124 metadata_last_entry_id,
125 ..
126 } => Some(*metadata_last_entry_id),
127 }
128 }
129}
130
131#[derive(Debug, PartialEq, Eq)]
133pub enum SetRegionRoleStateResponse {
134 Success(SetRegionRoleStateSuccess),
135 NotFound,
136}
137
138impl SetRegionRoleStateResponse {
139 pub fn success(success: SetRegionRoleStateSuccess) -> Self {
141 Self::Success(success)
142 }
143
144 pub fn is_not_found(&self) -> bool {
146 matches!(self, SetRegionRoleStateResponse::NotFound)
147 }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
151pub struct GrantedRegion {
152 pub region_id: RegionId,
153 pub region_role: RegionRole,
154 pub extensions: HashMap<String, Vec<u8>>,
155}
156
157impl GrantedRegion {
158 pub fn new(region_id: RegionId, region_role: RegionRole) -> Self {
159 Self {
160 region_id,
161 region_role,
162 extensions: HashMap::new(),
163 }
164 }
165}
166
167impl From<GrantedRegion> for PbGrantedRegion {
168 fn from(value: GrantedRegion) -> Self {
169 PbGrantedRegion {
170 region_id: value.region_id.as_u64(),
171 role: PbRegionRole::from(value.region_role).into(),
172 extensions: value.extensions,
173 }
174 }
175}
176
177impl From<PbGrantedRegion> for GrantedRegion {
178 fn from(value: PbGrantedRegion) -> Self {
179 GrantedRegion {
180 region_id: RegionId::from_u64(value.region_id),
181 region_role: value.role().into(),
182 extensions: value.extensions,
183 }
184 }
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
190pub enum RegionRole {
191 Follower,
193 Leader,
195 DowngradingLeader,
199}
200
201impl Display for RegionRole {
202 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 match self {
204 RegionRole::Follower => write!(f, "Follower"),
205 RegionRole::Leader => write!(f, "Leader"),
206 RegionRole::DowngradingLeader => write!(f, "Leader(Downgrading)"),
207 }
208 }
209}
210
211impl RegionRole {
212 pub fn writable(&self) -> bool {
213 matches!(self, RegionRole::Leader)
214 }
215}
216
217impl From<RegionRole> for PbRegionRole {
218 fn from(value: RegionRole) -> Self {
219 match value {
220 RegionRole::Follower => PbRegionRole::Follower,
221 RegionRole::Leader => PbRegionRole::Leader,
222 RegionRole::DowngradingLeader => PbRegionRole::DowngradingLeader,
223 }
224 }
225}
226
227impl From<PbRegionRole> for RegionRole {
228 fn from(value: PbRegionRole) -> Self {
229 match value {
230 PbRegionRole::Leader => RegionRole::Leader,
231 PbRegionRole::Follower => RegionRole::Follower,
232 PbRegionRole::DowngradingLeader => RegionRole::DowngradingLeader,
233 }
234 }
235}
236
237#[derive(Debug)]
239pub enum ScannerPartitioning {
240 Unknown(usize),
242}
243
244impl ScannerPartitioning {
245 pub fn num_partitions(&self) -> usize {
247 match self {
248 ScannerPartitioning::Unknown(num_partitions) => *num_partitions,
249 }
250 }
251}
252
253#[derive(Debug, Clone, Copy, PartialEq, Eq)]
255pub struct PartitionRange {
256 pub start: Timestamp,
258 pub end: Timestamp,
260 pub num_rows: usize,
262 pub identifier: usize,
264}
265
266#[derive(Debug, Default)]
268pub struct ScannerProperties {
269 pub partitions: Vec<Vec<PartitionRange>>,
274
275 append_mode: bool,
277
278 total_rows: usize,
281
282 pub distinguish_partition_range: bool,
284
285 target_partitions: usize,
287
288 logical_region: bool,
290}
291
292impl ScannerProperties {
293 pub fn with_append_mode(mut self, append_mode: bool) -> Self {
295 self.append_mode = append_mode;
296 self
297 }
298
299 pub fn with_total_rows(mut self, total_rows: usize) -> Self {
301 self.total_rows = total_rows;
302 self
303 }
304
305 pub fn new(partitions: Vec<Vec<PartitionRange>>, append_mode: bool, total_rows: usize) -> Self {
307 Self {
308 partitions,
309 append_mode,
310 total_rows,
311 distinguish_partition_range: false,
312 target_partitions: 0,
313 logical_region: false,
314 }
315 }
316
317 pub fn prepare(&mut self, request: PrepareRequest) {
319 if let Some(ranges) = request.ranges {
320 self.partitions = ranges;
321 }
322 if let Some(distinguish_partition_range) = request.distinguish_partition_range {
323 self.distinguish_partition_range = distinguish_partition_range;
324 }
325 if let Some(target_partitions) = request.target_partitions {
326 self.target_partitions = target_partitions;
327 }
328 }
329
330 pub fn num_partitions(&self) -> usize {
332 self.partitions.len()
333 }
334
335 pub fn append_mode(&self) -> bool {
336 self.append_mode
337 }
338
339 pub fn total_rows(&self) -> usize {
340 self.total_rows
341 }
342
343 pub fn is_logical_region(&self) -> bool {
345 self.logical_region
346 }
347
348 pub fn target_partitions(&self) -> usize {
350 if self.target_partitions == 0 {
351 self.num_partitions()
352 } else {
353 self.target_partitions
354 }
355 }
356
357 pub fn set_logical_region(&mut self, logical_region: bool) {
359 self.logical_region = logical_region;
360 }
361}
362
363#[derive(Default)]
365pub struct PrepareRequest {
366 pub ranges: Option<Vec<Vec<PartitionRange>>>,
368 pub distinguish_partition_range: Option<bool>,
370 pub target_partitions: Option<usize>,
372}
373
374impl PrepareRequest {
375 pub fn with_ranges(mut self, ranges: Vec<Vec<PartitionRange>>) -> Self {
377 self.ranges = Some(ranges);
378 self
379 }
380
381 pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self {
383 self.distinguish_partition_range = Some(distinguish_partition_range);
384 self
385 }
386
387 pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
389 self.target_partitions = Some(target_partitions);
390 self
391 }
392}
393
394pub trait RegionScanner: Debug + DisplayAs + Send {
399 fn properties(&self) -> &ScannerProperties;
401
402 fn schema(&self) -> SchemaRef;
404
405 fn metadata(&self) -> RegionMetadataRef;
407
408 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>;
412
413 fn scan_partition(
418 &self,
419 metrics_set: &ExecutionPlanMetricsSet,
420 partition: usize,
421 ) -> Result<SendableRecordBatchStream, BoxedError>;
422
423 fn has_predicate(&self) -> bool;
425
426 fn set_logical_region(&mut self, logical_region: bool);
428}
429
430pub type RegionScannerRef = Box<dyn RegionScanner>;
431
432pub type BatchResponses = Vec<(RegionId, Result<RegionResponse, BoxedError>)>;
433
434#[derive(Debug, Deserialize, Serialize, Default)]
436pub struct RegionStatistic {
437 #[serde(default)]
439 pub num_rows: u64,
440 pub memtable_size: u64,
442 pub wal_size: u64,
444 pub manifest_size: u64,
446 pub sst_size: u64,
448 #[serde(default)]
450 pub index_size: u64,
451 #[serde(default)]
453 pub manifest: RegionManifestInfo,
454 #[serde(default)]
458 pub data_topic_latest_entry_id: u64,
459 #[serde(default)]
460 pub metadata_topic_latest_entry_id: u64,
461}
462
463#[derive(Debug, Clone, Serialize, Deserialize)]
465pub enum RegionManifestInfo {
466 Mito {
467 manifest_version: u64,
468 flushed_entry_id: u64,
469 },
470 Metric {
471 data_manifest_version: u64,
472 data_flushed_entry_id: u64,
473 metadata_manifest_version: u64,
474 metadata_flushed_entry_id: u64,
475 },
476}
477
478impl RegionManifestInfo {
479 pub fn mito(manifest_version: u64, flushed_entry_id: u64) -> Self {
481 Self::Mito {
482 manifest_version,
483 flushed_entry_id,
484 }
485 }
486
487 pub fn metric(
489 data_manifest_version: u64,
490 data_flushed_entry_id: u64,
491 metadata_manifest_version: u64,
492 metadata_flushed_entry_id: u64,
493 ) -> Self {
494 Self::Metric {
495 data_manifest_version,
496 data_flushed_entry_id,
497 metadata_manifest_version,
498 metadata_flushed_entry_id,
499 }
500 }
501
502 pub fn is_mito(&self) -> bool {
504 matches!(self, RegionManifestInfo::Mito { .. })
505 }
506
507 pub fn is_metric(&self) -> bool {
509 matches!(self, RegionManifestInfo::Metric { .. })
510 }
511
512 pub fn data_flushed_entry_id(&self) -> u64 {
514 match self {
515 RegionManifestInfo::Mito {
516 flushed_entry_id, ..
517 } => *flushed_entry_id,
518 RegionManifestInfo::Metric {
519 data_flushed_entry_id,
520 ..
521 } => *data_flushed_entry_id,
522 }
523 }
524
525 pub fn data_manifest_version(&self) -> u64 {
527 match self {
528 RegionManifestInfo::Mito {
529 manifest_version, ..
530 } => *manifest_version,
531 RegionManifestInfo::Metric {
532 data_manifest_version,
533 ..
534 } => *data_manifest_version,
535 }
536 }
537
538 pub fn metadata_manifest_version(&self) -> Option<u64> {
540 match self {
541 RegionManifestInfo::Mito { .. } => None,
542 RegionManifestInfo::Metric {
543 metadata_manifest_version,
544 ..
545 } => Some(*metadata_manifest_version),
546 }
547 }
548
549 pub fn metadata_flushed_entry_id(&self) -> Option<u64> {
551 match self {
552 RegionManifestInfo::Mito { .. } => None,
553 RegionManifestInfo::Metric {
554 metadata_flushed_entry_id,
555 ..
556 } => Some(*metadata_flushed_entry_id),
557 }
558 }
559
560 pub fn encode_list(manifest_infos: &[(RegionId, Self)]) -> serde_json::Result<Vec<u8>> {
562 serde_json::to_vec(manifest_infos)
563 }
564
565 pub fn decode_list(value: &[u8]) -> serde_json::Result<Vec<(RegionId, Self)>> {
567 serde_json::from_slice(value)
568 }
569}
570
571impl Default for RegionManifestInfo {
572 fn default() -> Self {
573 Self::Mito {
574 manifest_version: 0,
575 flushed_entry_id: 0,
576 }
577 }
578}
579
580impl RegionStatistic {
581 pub fn deserialize_from_slice(value: &[u8]) -> Option<RegionStatistic> {
585 serde_json::from_slice(value).ok()
586 }
587
588 pub fn serialize_to_vec(&self) -> Option<Vec<u8>> {
592 serde_json::to_vec(self).ok()
593 }
594}
595
596impl RegionStatistic {
597 pub fn estimated_disk_size(&self) -> u64 {
599 self.wal_size + self.sst_size + self.manifest_size + self.index_size
600 }
601}
602
603#[derive(Debug)]
605pub enum SyncManifestResponse {
606 NotSupported,
607 Mito {
608 synced: bool,
610 },
611 Metric {
612 metadata_synced: bool,
614 data_synced: bool,
616 new_opened_logical_region_ids: Vec<RegionId>,
619 },
620}
621
622impl SyncManifestResponse {
623 pub fn is_data_synced(&self) -> bool {
625 match self {
626 SyncManifestResponse::NotSupported => false,
627 SyncManifestResponse::Mito { synced } => *synced,
628 SyncManifestResponse::Metric { data_synced, .. } => *data_synced,
629 }
630 }
631
632 pub fn is_supported(&self) -> bool {
634 matches!(self, SyncManifestResponse::NotSupported)
635 }
636
637 pub fn is_mito(&self) -> bool {
639 matches!(self, SyncManifestResponse::Mito { .. })
640 }
641
642 pub fn is_metric(&self) -> bool {
644 matches!(self, SyncManifestResponse::Metric { .. })
645 }
646
647 pub fn new_opened_logical_region_ids(self) -> Option<Vec<RegionId>> {
649 match self {
650 SyncManifestResponse::Metric {
651 new_opened_logical_region_ids,
652 ..
653 } => Some(new_opened_logical_region_ids),
654 _ => None,
655 }
656 }
657}
658
659#[async_trait]
660pub trait RegionEngine: Send + Sync {
661 fn name(&self) -> &str;
663
664 async fn handle_batch_open_requests(
666 &self,
667 parallelism: usize,
668 requests: Vec<(RegionId, RegionOpenRequest)>,
669 ) -> Result<BatchResponses, BoxedError> {
670 let semaphore = Arc::new(Semaphore::new(parallelism));
671 let mut tasks = Vec::with_capacity(requests.len());
672
673 for (region_id, request) in requests {
674 let semaphore_moved = semaphore.clone();
675
676 tasks.push(async move {
677 let _permit = semaphore_moved.acquire().await.unwrap();
679 let result = self
680 .handle_request(region_id, RegionRequest::Open(request))
681 .await;
682 (region_id, result)
683 });
684 }
685
686 Ok(join_all(tasks).await)
687 }
688
689 async fn handle_batch_ddl_requests(
690 &self,
691 request: BatchRegionDdlRequest,
692 ) -> Result<RegionResponse, BoxedError> {
693 let requests = request.into_region_requests();
694
695 let mut affected_rows = 0;
696 let mut extensions = HashMap::new();
697
698 for (region_id, request) in requests {
699 let result = self.handle_request(region_id, request).await?;
700 affected_rows += result.affected_rows;
701 extensions.extend(result.extensions);
702 }
703
704 Ok(RegionResponse {
705 affected_rows,
706 extensions,
707 })
708 }
709
710 async fn handle_request(
712 &self,
713 region_id: RegionId,
714 request: RegionRequest,
715 ) -> Result<RegionResponse, BoxedError>;
716
717 async fn get_last_seq_num(
719 &self,
720 region_id: RegionId,
721 ) -> Result<Option<SequenceNumber>, BoxedError>;
722
723 async fn get_region_sequences(
724 &self,
725 seqs: RegionSequencesRequest,
726 ) -> Result<HashMap<u64, u64>, BoxedError> {
727 let mut results = HashMap::with_capacity(seqs.region_ids.len());
728
729 for region_id in seqs.region_ids {
730 let seq = self.get_last_seq_num(region_id).await?.unwrap_or_default();
731 results.insert(region_id.as_u64(), seq);
732 }
733
734 Ok(results)
735 }
736
737 async fn handle_query(
739 &self,
740 region_id: RegionId,
741 request: ScanRequest,
742 ) -> Result<RegionScannerRef, BoxedError>;
743
744 async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;
746
747 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic>;
749
750 async fn stop(&self) -> Result<(), BoxedError>;
752
753 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError>;
759
760 async fn sync_region(
762 &self,
763 region_id: RegionId,
764 manifest_info: RegionManifestInfo,
765 ) -> Result<SyncManifestResponse, BoxedError>;
766
767 async fn set_region_role_state_gracefully(
771 &self,
772 region_id: RegionId,
773 region_role_state: SettableRegionRoleState,
774 ) -> Result<SetRegionRoleStateResponse, BoxedError>;
775
776 fn role(&self, region_id: RegionId) -> Option<RegionRole>;
780
781 fn as_any(&self) -> &dyn Any;
782}
783
784pub type RegionEngineRef = Arc<dyn RegionEngine>;
785
786pub struct SinglePartitionScanner {
788 stream: Mutex<Option<SendableRecordBatchStream>>,
789 schema: SchemaRef,
790 properties: ScannerProperties,
791 metadata: RegionMetadataRef,
792}
793
794impl SinglePartitionScanner {
795 pub fn new(
797 stream: SendableRecordBatchStream,
798 append_mode: bool,
799 metadata: RegionMetadataRef,
800 ) -> Self {
801 let schema = stream.schema();
802 Self {
803 stream: Mutex::new(Some(stream)),
804 schema,
805 properties: ScannerProperties::default().with_append_mode(append_mode),
806 metadata,
807 }
808 }
809}
810
811impl Debug for SinglePartitionScanner {
812 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
813 write!(f, "SinglePartitionScanner: <SendableRecordBatchStream>")
814 }
815}
816
817impl RegionScanner for SinglePartitionScanner {
818 fn properties(&self) -> &ScannerProperties {
819 &self.properties
820 }
821
822 fn schema(&self) -> SchemaRef {
823 self.schema.clone()
824 }
825
826 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
827 self.properties.prepare(request);
828 Ok(())
829 }
830
831 fn scan_partition(
832 &self,
833 _metrics_set: &ExecutionPlanMetricsSet,
834 _partition: usize,
835 ) -> Result<SendableRecordBatchStream, BoxedError> {
836 let mut stream = self.stream.lock().unwrap();
837 stream.take().ok_or_else(|| {
838 BoxedError::new(PlainError::new(
839 "Not expected to run ExecutionPlan more than once".to_string(),
840 StatusCode::Unexpected,
841 ))
842 })
843 }
844
845 fn has_predicate(&self) -> bool {
846 false
847 }
848
849 fn metadata(&self) -> RegionMetadataRef {
850 self.metadata.clone()
851 }
852
853 fn set_logical_region(&mut self, logical_region: bool) {
854 self.properties.set_logical_region(logical_region);
855 }
856}
857
858impl DisplayAs for SinglePartitionScanner {
859 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
860 write!(f, "{:?}", self)
861 }
862}