1use std::any::Any;
18use std::collections::HashMap;
19use std::fmt::{Debug, Display};
20use std::sync::{Arc, Mutex};
21
22use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
23use api::region::RegionResponse;
24use async_trait::async_trait;
25use common_error::ext::BoxedError;
26use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
27use common_time::Timestamp;
28use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
29use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
30use datatypes::schema::SchemaRef;
31use futures::future::join_all;
32use serde::{Deserialize, Serialize};
33use tokio::sync::Semaphore;
34
35use crate::logstore::entry;
36use crate::metadata::RegionMetadataRef;
37use crate::region_request::{
38 BatchRegionDdlRequest, RegionOpenRequest, RegionRequest, RegionSequencesRequest,
39};
40use crate::storage::{RegionId, ScanRequest, SequenceNumber};
41
42#[derive(Debug, PartialEq, Eq, Clone, Copy)]
44pub enum SettableRegionRoleState {
45 Follower,
46 DowngradingLeader,
47}
48
49impl Display for SettableRegionRoleState {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 match self {
52 SettableRegionRoleState::Follower => write!(f, "Follower"),
53 SettableRegionRoleState::DowngradingLeader => write!(f, "Leader(Downgrading)"),
54 }
55 }
56}
57
58impl From<SettableRegionRoleState> for RegionRole {
59 fn from(value: SettableRegionRoleState) -> Self {
60 match value {
61 SettableRegionRoleState::Follower => RegionRole::Follower,
62 SettableRegionRoleState::DowngradingLeader => RegionRole::DowngradingLeader,
63 }
64 }
65}
66
67#[derive(Debug, PartialEq, Eq)]
69pub struct SetRegionRoleStateRequest {
70 region_id: RegionId,
71 region_role_state: SettableRegionRoleState,
72}
73
74#[derive(Debug, PartialEq, Eq)]
76pub enum SetRegionRoleStateSuccess {
77 File,
78 Mito {
79 last_entry_id: entry::Id,
80 },
81 Metric {
82 last_entry_id: entry::Id,
83 metadata_last_entry_id: entry::Id,
84 },
85}
86
87impl SetRegionRoleStateSuccess {
88 pub fn file() -> Self {
90 Self::File
91 }
92
93 pub fn mito(last_entry_id: entry::Id) -> Self {
95 SetRegionRoleStateSuccess::Mito { last_entry_id }
96 }
97
98 pub fn metric(last_entry_id: entry::Id, metadata_last_entry_id: entry::Id) -> Self {
100 SetRegionRoleStateSuccess::Metric {
101 last_entry_id,
102 metadata_last_entry_id,
103 }
104 }
105}
106
107impl SetRegionRoleStateSuccess {
108 pub fn last_entry_id(&self) -> Option<entry::Id> {
110 match self {
111 SetRegionRoleStateSuccess::File => None,
112 SetRegionRoleStateSuccess::Mito { last_entry_id } => Some(*last_entry_id),
113 SetRegionRoleStateSuccess::Metric { last_entry_id, .. } => Some(*last_entry_id),
114 }
115 }
116
117 pub fn metadata_last_entry_id(&self) -> Option<entry::Id> {
119 match self {
120 SetRegionRoleStateSuccess::File => None,
121 SetRegionRoleStateSuccess::Mito { .. } => None,
122 SetRegionRoleStateSuccess::Metric {
123 metadata_last_entry_id,
124 ..
125 } => Some(*metadata_last_entry_id),
126 }
127 }
128}
129
130#[derive(Debug, PartialEq, Eq)]
132pub enum SetRegionRoleStateResponse {
133 Success(SetRegionRoleStateSuccess),
134 NotFound,
135}
136
137impl SetRegionRoleStateResponse {
138 pub fn success(success: SetRegionRoleStateSuccess) -> Self {
140 Self::Success(success)
141 }
142
143 pub fn is_not_found(&self) -> bool {
145 matches!(self, SetRegionRoleStateResponse::NotFound)
146 }
147}
148
149#[derive(Debug, Clone, PartialEq, Eq)]
150pub struct GrantedRegion {
151 pub region_id: RegionId,
152 pub region_role: RegionRole,
153 pub extensions: HashMap<String, Vec<u8>>,
154}
155
156impl GrantedRegion {
157 pub fn new(region_id: RegionId, region_role: RegionRole) -> Self {
158 Self {
159 region_id,
160 region_role,
161 extensions: HashMap::new(),
162 }
163 }
164}
165
166impl From<GrantedRegion> for PbGrantedRegion {
167 fn from(value: GrantedRegion) -> Self {
168 PbGrantedRegion {
169 region_id: value.region_id.as_u64(),
170 role: PbRegionRole::from(value.region_role).into(),
171 extensions: value.extensions,
172 }
173 }
174}
175
176impl From<PbGrantedRegion> for GrantedRegion {
177 fn from(value: PbGrantedRegion) -> Self {
178 GrantedRegion {
179 region_id: RegionId::from_u64(value.region_id),
180 region_role: value.role().into(),
181 extensions: value.extensions,
182 }
183 }
184}
185
186#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
189pub enum RegionRole {
190 Follower,
192 Leader,
194 DowngradingLeader,
198}
199
200impl Display for RegionRole {
201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202 match self {
203 RegionRole::Follower => write!(f, "Follower"),
204 RegionRole::Leader => write!(f, "Leader"),
205 RegionRole::DowngradingLeader => write!(f, "Leader(Downgrading)"),
206 }
207 }
208}
209
210impl RegionRole {
211 pub fn writable(&self) -> bool {
212 matches!(self, RegionRole::Leader)
213 }
214}
215
216impl From<RegionRole> for PbRegionRole {
217 fn from(value: RegionRole) -> Self {
218 match value {
219 RegionRole::Follower => PbRegionRole::Follower,
220 RegionRole::Leader => PbRegionRole::Leader,
221 RegionRole::DowngradingLeader => PbRegionRole::DowngradingLeader,
222 }
223 }
224}
225
226impl From<PbRegionRole> for RegionRole {
227 fn from(value: PbRegionRole) -> Self {
228 match value {
229 PbRegionRole::Leader => RegionRole::Leader,
230 PbRegionRole::Follower => RegionRole::Follower,
231 PbRegionRole::DowngradingLeader => RegionRole::DowngradingLeader,
232 }
233 }
234}
235
236#[derive(Debug)]
238pub enum ScannerPartitioning {
239 Unknown(usize),
241}
242
243impl ScannerPartitioning {
244 pub fn num_partitions(&self) -> usize {
246 match self {
247 ScannerPartitioning::Unknown(num_partitions) => *num_partitions,
248 }
249 }
250}
251
252#[derive(Debug, Clone, Copy, PartialEq, Eq)]
254pub struct PartitionRange {
255 pub start: Timestamp,
257 pub end: Timestamp,
259 pub num_rows: usize,
261 pub identifier: usize,
263}
264
265#[derive(Debug, Default)]
267pub struct ScannerProperties {
268 pub partitions: Vec<Vec<PartitionRange>>,
273
274 append_mode: bool,
276
277 total_rows: usize,
280
281 pub distinguish_partition_range: bool,
283
284 target_partitions: usize,
286
287 logical_region: bool,
289}
290
291impl ScannerProperties {
292 pub fn with_append_mode(mut self, append_mode: bool) -> Self {
294 self.append_mode = append_mode;
295 self
296 }
297
298 pub fn with_total_rows(mut self, total_rows: usize) -> Self {
300 self.total_rows = total_rows;
301 self
302 }
303
304 pub fn new(partitions: Vec<Vec<PartitionRange>>, append_mode: bool, total_rows: usize) -> Self {
306 Self {
307 partitions,
308 append_mode,
309 total_rows,
310 distinguish_partition_range: false,
311 target_partitions: 0,
312 logical_region: false,
313 }
314 }
315
316 pub fn prepare(&mut self, request: PrepareRequest) {
318 if let Some(ranges) = request.ranges {
319 self.partitions = ranges;
320 }
321 if let Some(distinguish_partition_range) = request.distinguish_partition_range {
322 self.distinguish_partition_range = distinguish_partition_range;
323 }
324 if let Some(target_partitions) = request.target_partitions {
325 self.target_partitions = target_partitions;
326 }
327 }
328
329 pub fn num_partitions(&self) -> usize {
331 self.partitions.len()
332 }
333
334 pub fn append_mode(&self) -> bool {
335 self.append_mode
336 }
337
338 pub fn total_rows(&self) -> usize {
339 self.total_rows
340 }
341
342 pub fn is_logical_region(&self) -> bool {
344 self.logical_region
345 }
346
347 pub fn target_partitions(&self) -> usize {
349 if self.target_partitions == 0 {
350 self.num_partitions()
351 } else {
352 self.target_partitions
353 }
354 }
355
356 pub fn set_logical_region(&mut self, logical_region: bool) {
358 self.logical_region = logical_region;
359 }
360}
361
362#[derive(Default)]
364pub struct PrepareRequest {
365 pub ranges: Option<Vec<Vec<PartitionRange>>>,
367 pub distinguish_partition_range: Option<bool>,
369 pub target_partitions: Option<usize>,
371}
372
373impl PrepareRequest {
374 pub fn with_ranges(mut self, ranges: Vec<Vec<PartitionRange>>) -> Self {
376 self.ranges = Some(ranges);
377 self
378 }
379
380 pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self {
382 self.distinguish_partition_range = Some(distinguish_partition_range);
383 self
384 }
385
386 pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
388 self.target_partitions = Some(target_partitions);
389 self
390 }
391}
392
393pub trait RegionScanner: Debug + DisplayAs + Send {
398 fn properties(&self) -> &ScannerProperties;
400
401 fn schema(&self) -> SchemaRef;
403
404 fn metadata(&self) -> RegionMetadataRef;
406
407 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>;
411
412 fn scan_partition(
417 &self,
418 metrics_set: &ExecutionPlanMetricsSet,
419 partition: usize,
420 ) -> Result<SendableRecordBatchStream, BoxedError>;
421
422 fn has_predicate(&self) -> bool;
424
425 fn set_logical_region(&mut self, logical_region: bool);
427}
428
429pub type RegionScannerRef = Box<dyn RegionScanner>;
430
431pub type BatchResponses = Vec<(RegionId, Result<RegionResponse, BoxedError>)>;
432
433#[derive(Debug, Deserialize, Serialize, Default)]
435pub struct RegionStatistic {
436 #[serde(default)]
438 pub num_rows: u64,
439 pub memtable_size: u64,
441 pub wal_size: u64,
443 pub manifest_size: u64,
445 pub sst_size: u64,
447 #[serde(default)]
449 pub index_size: u64,
450 #[serde(default)]
452 pub manifest: RegionManifestInfo,
453 #[serde(default)]
457 pub data_topic_latest_entry_id: u64,
458 #[serde(default)]
459 pub metadata_topic_latest_entry_id: u64,
460}
461
462#[derive(Debug, Clone, Serialize, Deserialize)]
464pub enum RegionManifestInfo {
465 Mito {
466 manifest_version: u64,
467 flushed_entry_id: u64,
468 },
469 Metric {
470 data_manifest_version: u64,
471 data_flushed_entry_id: u64,
472 metadata_manifest_version: u64,
473 metadata_flushed_entry_id: u64,
474 },
475}
476
477impl RegionManifestInfo {
478 pub fn mito(manifest_version: u64, flushed_entry_id: u64) -> Self {
480 Self::Mito {
481 manifest_version,
482 flushed_entry_id,
483 }
484 }
485
486 pub fn metric(
488 data_manifest_version: u64,
489 data_flushed_entry_id: u64,
490 metadata_manifest_version: u64,
491 metadata_flushed_entry_id: u64,
492 ) -> Self {
493 Self::Metric {
494 data_manifest_version,
495 data_flushed_entry_id,
496 metadata_manifest_version,
497 metadata_flushed_entry_id,
498 }
499 }
500
501 pub fn is_mito(&self) -> bool {
503 matches!(self, RegionManifestInfo::Mito { .. })
504 }
505
506 pub fn is_metric(&self) -> bool {
508 matches!(self, RegionManifestInfo::Metric { .. })
509 }
510
511 pub fn data_flushed_entry_id(&self) -> u64 {
513 match self {
514 RegionManifestInfo::Mito {
515 flushed_entry_id, ..
516 } => *flushed_entry_id,
517 RegionManifestInfo::Metric {
518 data_flushed_entry_id,
519 ..
520 } => *data_flushed_entry_id,
521 }
522 }
523
524 pub fn data_manifest_version(&self) -> u64 {
526 match self {
527 RegionManifestInfo::Mito {
528 manifest_version, ..
529 } => *manifest_version,
530 RegionManifestInfo::Metric {
531 data_manifest_version,
532 ..
533 } => *data_manifest_version,
534 }
535 }
536
537 pub fn metadata_manifest_version(&self) -> Option<u64> {
539 match self {
540 RegionManifestInfo::Mito { .. } => None,
541 RegionManifestInfo::Metric {
542 metadata_manifest_version,
543 ..
544 } => Some(*metadata_manifest_version),
545 }
546 }
547
548 pub fn metadata_flushed_entry_id(&self) -> Option<u64> {
550 match self {
551 RegionManifestInfo::Mito { .. } => None,
552 RegionManifestInfo::Metric {
553 metadata_flushed_entry_id,
554 ..
555 } => Some(*metadata_flushed_entry_id),
556 }
557 }
558
559 pub fn encode_list(manifest_infos: &[(RegionId, Self)]) -> serde_json::Result<Vec<u8>> {
561 serde_json::to_vec(manifest_infos)
562 }
563
564 pub fn decode_list(value: &[u8]) -> serde_json::Result<Vec<(RegionId, Self)>> {
566 serde_json::from_slice(value)
567 }
568}
569
570impl Default for RegionManifestInfo {
571 fn default() -> Self {
572 Self::Mito {
573 manifest_version: 0,
574 flushed_entry_id: 0,
575 }
576 }
577}
578
579impl RegionStatistic {
580 pub fn deserialize_from_slice(value: &[u8]) -> Option<RegionStatistic> {
584 serde_json::from_slice(value).ok()
585 }
586
587 pub fn serialize_to_vec(&self) -> Option<Vec<u8>> {
591 serde_json::to_vec(self).ok()
592 }
593}
594
595impl RegionStatistic {
596 pub fn estimated_disk_size(&self) -> u64 {
598 self.wal_size + self.sst_size + self.manifest_size + self.index_size
599 }
600}
601
602#[derive(Debug)]
604pub enum SyncManifestResponse {
605 NotSupported,
606 Mito {
607 synced: bool,
609 },
610 Metric {
611 metadata_synced: bool,
613 data_synced: bool,
615 new_opened_logical_region_ids: Vec<RegionId>,
618 },
619}
620
621impl SyncManifestResponse {
622 pub fn is_data_synced(&self) -> bool {
624 match self {
625 SyncManifestResponse::NotSupported => false,
626 SyncManifestResponse::Mito { synced } => *synced,
627 SyncManifestResponse::Metric { data_synced, .. } => *data_synced,
628 }
629 }
630
631 pub fn is_supported(&self) -> bool {
633 matches!(self, SyncManifestResponse::NotSupported)
634 }
635
636 pub fn is_mito(&self) -> bool {
638 matches!(self, SyncManifestResponse::Mito { .. })
639 }
640
641 pub fn is_metric(&self) -> bool {
643 matches!(self, SyncManifestResponse::Metric { .. })
644 }
645
646 pub fn new_opened_logical_region_ids(self) -> Option<Vec<RegionId>> {
648 match self {
649 SyncManifestResponse::Metric {
650 new_opened_logical_region_ids,
651 ..
652 } => Some(new_opened_logical_region_ids),
653 _ => None,
654 }
655 }
656}
657
658#[async_trait]
659pub trait RegionEngine: Send + Sync {
660 fn name(&self) -> &str;
662
663 async fn handle_batch_open_requests(
665 &self,
666 parallelism: usize,
667 requests: Vec<(RegionId, RegionOpenRequest)>,
668 ) -> Result<BatchResponses, BoxedError> {
669 let semaphore = Arc::new(Semaphore::new(parallelism));
670 let mut tasks = Vec::with_capacity(requests.len());
671
672 for (region_id, request) in requests {
673 let semaphore_moved = semaphore.clone();
674
675 tasks.push(async move {
676 let _permit = semaphore_moved.acquire().await.unwrap();
678 let result = self
679 .handle_request(region_id, RegionRequest::Open(request))
680 .await;
681 (region_id, result)
682 });
683 }
684
685 Ok(join_all(tasks).await)
686 }
687
688 async fn handle_batch_ddl_requests(
689 &self,
690 request: BatchRegionDdlRequest,
691 ) -> Result<RegionResponse, BoxedError> {
692 let requests = request.into_region_requests();
693
694 let mut affected_rows = 0;
695 let mut extensions = HashMap::new();
696
697 for (region_id, request) in requests {
698 let result = self.handle_request(region_id, request).await?;
699 affected_rows += result.affected_rows;
700 extensions.extend(result.extensions);
701 }
702
703 Ok(RegionResponse {
704 affected_rows,
705 extensions,
706 })
707 }
708
709 async fn handle_request(
711 &self,
712 region_id: RegionId,
713 request: RegionRequest,
714 ) -> Result<RegionResponse, BoxedError>;
715
716 async fn get_last_seq_num(
718 &self,
719 region_id: RegionId,
720 ) -> Result<Option<SequenceNumber>, BoxedError>;
721
722 async fn get_region_sequences(
723 &self,
724 seqs: RegionSequencesRequest,
725 ) -> Result<HashMap<u64, u64>, BoxedError> {
726 let mut results = HashMap::with_capacity(seqs.region_ids.len());
727
728 for region_id in seqs.region_ids {
729 let seq = self.get_last_seq_num(region_id).await?.unwrap_or_default();
730 results.insert(region_id.as_u64(), seq);
731 }
732
733 Ok(results)
734 }
735
736 async fn handle_query(
738 &self,
739 region_id: RegionId,
740 request: ScanRequest,
741 ) -> Result<RegionScannerRef, BoxedError>;
742
743 async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;
745
746 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic>;
748
749 async fn stop(&self) -> Result<(), BoxedError>;
751
752 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError>;
758
759 async fn sync_region(
761 &self,
762 region_id: RegionId,
763 manifest_info: RegionManifestInfo,
764 ) -> Result<SyncManifestResponse, BoxedError>;
765
766 async fn set_region_role_state_gracefully(
770 &self,
771 region_id: RegionId,
772 region_role_state: SettableRegionRoleState,
773 ) -> Result<SetRegionRoleStateResponse, BoxedError>;
774
775 fn role(&self, region_id: RegionId) -> Option<RegionRole>;
779
780 fn as_any(&self) -> &dyn Any;
781}
782
783pub type RegionEngineRef = Arc<dyn RegionEngine>;
784
785pub struct SinglePartitionScanner {
787 stream: Mutex<Option<SendableRecordBatchStream>>,
788 schema: SchemaRef,
789 properties: ScannerProperties,
790 metadata: RegionMetadataRef,
791}
792
793impl SinglePartitionScanner {
794 pub fn new(
796 stream: SendableRecordBatchStream,
797 append_mode: bool,
798 metadata: RegionMetadataRef,
799 ) -> Self {
800 let schema = stream.schema();
801 Self {
802 stream: Mutex::new(Some(stream)),
803 schema,
804 properties: ScannerProperties::default().with_append_mode(append_mode),
805 metadata,
806 }
807 }
808}
809
810impl Debug for SinglePartitionScanner {
811 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
812 write!(f, "SinglePartitionScanner: <SendableRecordBatchStream>")
813 }
814}
815
816impl RegionScanner for SinglePartitionScanner {
817 fn properties(&self) -> &ScannerProperties {
818 &self.properties
819 }
820
821 fn schema(&self) -> SchemaRef {
822 self.schema.clone()
823 }
824
825 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
826 self.properties.prepare(request);
827 Ok(())
828 }
829
830 fn scan_partition(
831 &self,
832 _metrics_set: &ExecutionPlanMetricsSet,
833 _partition: usize,
834 ) -> Result<SendableRecordBatchStream, BoxedError> {
835 let mut stream = self.stream.lock().unwrap();
836 let result = stream
837 .take()
838 .or_else(|| Some(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))));
839 Ok(result.unwrap())
840 }
841
842 fn has_predicate(&self) -> bool {
843 false
844 }
845
846 fn metadata(&self) -> RegionMetadataRef {
847 self.metadata.clone()
848 }
849
850 fn set_logical_region(&mut self, logical_region: bool) {
851 self.properties.set_logical_region(logical_region);
852 }
853}
854
855impl DisplayAs for SinglePartitionScanner {
856 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
857 write!(f, "{:?}", self)
858 }
859}