1use std::any::Any;
18use std::collections::HashMap;
19use std::fmt::{Debug, Display};
20use std::sync::{Arc, Mutex};
21
22use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
23use api::region::RegionResponse;
24use async_trait::async_trait;
25use common_error::ext::BoxedError;
26use common_recordbatch::{EmptyRecordBatchStream, QueryMemoryTracker, SendableRecordBatchStream};
27use common_time::Timestamp;
28use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
29use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PhysicalExpr};
30use datatypes::schema::SchemaRef;
31use futures::future::join_all;
32use serde::{Deserialize, Serialize};
33use tokio::sync::Semaphore;
34
35use crate::logstore::entry;
36use crate::metadata::RegionMetadataRef;
37use crate::region_request::{
38 BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
39};
40use crate::storage::{FileId, RegionId, ScanRequest, SequenceNumber};
41
42#[derive(Debug, PartialEq, Eq, Clone, Copy)]
44pub enum SettableRegionRoleState {
45 Follower,
46 DowngradingLeader,
47 Leader,
49 StagingLeader,
51}
52
53impl Display for SettableRegionRoleState {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 match self {
56 SettableRegionRoleState::Follower => write!(f, "Follower"),
57 SettableRegionRoleState::DowngradingLeader => write!(f, "Leader(Downgrading)"),
58 SettableRegionRoleState::Leader => write!(f, "Leader"),
59 SettableRegionRoleState::StagingLeader => write!(f, "Leader(Staging)"),
60 }
61 }
62}
63
64impl From<SettableRegionRoleState> for RegionRole {
65 fn from(value: SettableRegionRoleState) -> Self {
66 match value {
67 SettableRegionRoleState::Follower => RegionRole::Follower,
68 SettableRegionRoleState::DowngradingLeader => RegionRole::DowngradingLeader,
69 SettableRegionRoleState::Leader => RegionRole::Leader,
70 SettableRegionRoleState::StagingLeader => RegionRole::StagingLeader,
71 }
72 }
73}
74
75#[derive(Debug, PartialEq, Eq)]
77pub struct SetRegionRoleStateRequest {
78 region_id: RegionId,
79 region_role_state: SettableRegionRoleState,
80}
81
82#[derive(Debug, PartialEq, Eq)]
84pub enum SetRegionRoleStateSuccess {
85 File,
86 Mito {
87 last_entry_id: entry::Id,
88 },
89 Metric {
90 last_entry_id: entry::Id,
91 metadata_last_entry_id: entry::Id,
92 },
93}
94
95impl SetRegionRoleStateSuccess {
96 pub fn file() -> Self {
98 Self::File
99 }
100
101 pub fn mito(last_entry_id: entry::Id) -> Self {
103 SetRegionRoleStateSuccess::Mito { last_entry_id }
104 }
105
106 pub fn metric(last_entry_id: entry::Id, metadata_last_entry_id: entry::Id) -> Self {
108 SetRegionRoleStateSuccess::Metric {
109 last_entry_id,
110 metadata_last_entry_id,
111 }
112 }
113}
114
115impl SetRegionRoleStateSuccess {
116 pub fn last_entry_id(&self) -> Option<entry::Id> {
118 match self {
119 SetRegionRoleStateSuccess::File => None,
120 SetRegionRoleStateSuccess::Mito { last_entry_id } => Some(*last_entry_id),
121 SetRegionRoleStateSuccess::Metric { last_entry_id, .. } => Some(*last_entry_id),
122 }
123 }
124
125 pub fn metadata_last_entry_id(&self) -> Option<entry::Id> {
127 match self {
128 SetRegionRoleStateSuccess::File => None,
129 SetRegionRoleStateSuccess::Mito { .. } => None,
130 SetRegionRoleStateSuccess::Metric {
131 metadata_last_entry_id,
132 ..
133 } => Some(*metadata_last_entry_id),
134 }
135 }
136}
137
138#[derive(Debug)]
140pub enum SetRegionRoleStateResponse {
141 Success(SetRegionRoleStateSuccess),
142 NotFound,
143 InvalidTransition(BoxedError),
144}
145
146impl SetRegionRoleStateResponse {
147 pub fn success(success: SetRegionRoleStateSuccess) -> Self {
149 Self::Success(success)
150 }
151
152 pub fn invalid_transition(error: BoxedError) -> Self {
154 Self::InvalidTransition(error)
155 }
156
157 pub fn is_not_found(&self) -> bool {
159 matches!(self, SetRegionRoleStateResponse::NotFound)
160 }
161
162 pub fn is_invalid_transition(&self) -> bool {
164 matches!(self, SetRegionRoleStateResponse::InvalidTransition(_))
165 }
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
169pub struct GrantedRegion {
170 pub region_id: RegionId,
171 pub region_role: RegionRole,
172 pub extensions: HashMap<String, Vec<u8>>,
173}
174
175impl GrantedRegion {
176 pub fn new(region_id: RegionId, region_role: RegionRole) -> Self {
177 Self {
178 region_id,
179 region_role,
180 extensions: HashMap::new(),
181 }
182 }
183}
184
185impl From<GrantedRegion> for PbGrantedRegion {
186 fn from(value: GrantedRegion) -> Self {
187 PbGrantedRegion {
188 region_id: value.region_id.as_u64(),
189 role: PbRegionRole::from(value.region_role).into(),
190 extensions: value.extensions,
191 }
192 }
193}
194
195impl From<PbGrantedRegion> for GrantedRegion {
196 fn from(value: PbGrantedRegion) -> Self {
197 GrantedRegion {
198 region_id: RegionId::from_u64(value.region_id),
199 region_role: value.role().into(),
200 extensions: value.extensions,
201 }
202 }
203}
204
205#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
208pub enum RegionRole {
209 Follower,
211 Leader,
213 StagingLeader,
218 DowngradingLeader,
222}
223
224impl Display for RegionRole {
225 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226 match self {
227 RegionRole::Follower => write!(f, "Follower"),
228 RegionRole::Leader => write!(f, "Leader"),
229 RegionRole::StagingLeader => write!(f, "Leader(Staging)"),
230 RegionRole::DowngradingLeader => write!(f, "Leader(Downgrading)"),
231 }
232 }
233}
234
235impl RegionRole {
236 pub fn writable(&self) -> bool {
237 matches!(self, RegionRole::Leader | RegionRole::StagingLeader)
238 }
239}
240
241impl From<RegionRole> for PbRegionRole {
242 fn from(value: RegionRole) -> Self {
243 match value {
244 RegionRole::Follower => PbRegionRole::Follower,
245 RegionRole::Leader => PbRegionRole::Leader,
246 RegionRole::StagingLeader => PbRegionRole::StagingLeader,
247 RegionRole::DowngradingLeader => PbRegionRole::DowngradingLeader,
248 }
249 }
250}
251
252impl From<PbRegionRole> for RegionRole {
253 fn from(value: PbRegionRole) -> Self {
254 match value {
255 PbRegionRole::Leader => RegionRole::Leader,
256 PbRegionRole::StagingLeader => RegionRole::StagingLeader,
257 PbRegionRole::Follower => RegionRole::Follower,
258 PbRegionRole::DowngradingLeader => RegionRole::DowngradingLeader,
259 }
260 }
261}
262
263#[derive(Debug)]
265pub enum ScannerPartitioning {
266 Unknown(usize),
268}
269
270impl ScannerPartitioning {
271 pub fn num_partitions(&self) -> usize {
273 match self {
274 ScannerPartitioning::Unknown(num_partitions) => *num_partitions,
275 }
276 }
277}
278
279#[derive(Debug, Clone, Copy, PartialEq, Eq)]
281pub struct PartitionRange {
282 pub start: Timestamp,
284 pub end: Timestamp,
286 pub num_rows: usize,
288 pub identifier: usize,
290}
291
292#[derive(Debug, Default)]
294pub struct ScannerProperties {
295 pub partitions: Vec<Vec<PartitionRange>>,
300
301 append_mode: bool,
303
304 total_rows: usize,
307
308 pub distinguish_partition_range: bool,
310
311 target_partitions: usize,
313
314 logical_region: bool,
316
317 query_load_region_id: Option<RegionId>,
319}
320
321impl ScannerProperties {
322 pub fn with_append_mode(mut self, append_mode: bool) -> Self {
324 self.append_mode = append_mode;
325 self
326 }
327
328 pub fn with_total_rows(mut self, total_rows: usize) -> Self {
330 self.total_rows = total_rows;
331 self
332 }
333
334 pub fn new(partitions: Vec<Vec<PartitionRange>>, append_mode: bool, total_rows: usize) -> Self {
336 Self {
337 partitions,
338 append_mode,
339 total_rows,
340 distinguish_partition_range: false,
341 target_partitions: 0,
342 logical_region: false,
343 query_load_region_id: None,
344 }
345 }
346
347 pub fn prepare(&mut self, request: PrepareRequest) {
349 if let Some(ranges) = request.ranges {
350 self.partitions = ranges;
351 }
352 if let Some(distinguish_partition_range) = request.distinguish_partition_range {
353 self.distinguish_partition_range = distinguish_partition_range;
354 }
355 if let Some(target_partitions) = request.target_partitions {
356 self.target_partitions = target_partitions;
357 }
358 }
359
360 pub fn num_partitions(&self) -> usize {
362 self.partitions.len()
363 }
364
365 pub fn append_mode(&self) -> bool {
366 self.append_mode
367 }
368
369 pub fn total_rows(&self) -> usize {
370 self.total_rows
371 }
372
373 pub fn is_logical_region(&self) -> bool {
375 self.logical_region
376 }
377
378 pub fn target_partitions(&self) -> usize {
380 if self.target_partitions == 0 {
381 self.num_partitions()
382 } else {
383 self.target_partitions
384 }
385 }
386
387 pub fn set_logical_region(&mut self, logical_region: bool) {
389 self.logical_region = logical_region;
390 }
391
392 pub fn query_load_region_id(&self) -> Option<RegionId> {
394 self.query_load_region_id
395 }
396
397 pub fn set_query_load_region_id(&mut self, region_id: RegionId) {
399 self.query_load_region_id = Some(region_id);
400 }
401}
402
403#[derive(Default)]
405pub struct PrepareRequest {
406 pub ranges: Option<Vec<Vec<PartitionRange>>>,
408 pub distinguish_partition_range: Option<bool>,
410 pub target_partitions: Option<usize>,
412}
413
414impl PrepareRequest {
415 pub fn with_ranges(mut self, ranges: Vec<Vec<PartitionRange>>) -> Self {
417 self.ranges = Some(ranges);
418 self
419 }
420
421 pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self {
423 self.distinguish_partition_range = Some(distinguish_partition_range);
424 self
425 }
426
427 pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
429 self.target_partitions = Some(target_partitions);
430 self
431 }
432}
433
434#[derive(Clone, Default)]
436pub struct QueryScanContext {
437 pub explain_verbose: bool,
439}
440
441pub trait RegionScanner: Debug + DisplayAs + Send {
446 fn name(&self) -> &str;
447
448 fn properties(&self) -> &ScannerProperties;
450
451 fn schema(&self) -> SchemaRef;
453
454 fn metadata(&self) -> RegionMetadataRef;
456
457 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>;
461
462 fn scan_partition(
467 &self,
468 ctx: &QueryScanContext,
469 metrics_set: &ExecutionPlanMetricsSet,
470 partition: usize,
471 ) -> Result<SendableRecordBatchStream, BoxedError>;
472
473 fn has_predicate_without_region(&self) -> bool;
475
476 fn add_dyn_filter_to_predicate(
481 &mut self,
482 filter_exprs: Vec<Arc<dyn PhysicalExpr>>,
483 ) -> Vec<bool>;
484
485 fn set_logical_region(&mut self, logical_region: bool);
487
488 fn set_query_load_region_id(&mut self, region_id: RegionId);
490
491 fn snapshot_sequence(&self) -> Option<SequenceNumber> {
492 None
493 }
494}
495
496pub type RegionScannerRef = Box<dyn RegionScanner>;
497
498pub type BatchResponses = Vec<(RegionId, Result<RegionResponse, BoxedError>)>;
499
500#[derive(Debug, Deserialize, Serialize, Default)]
502pub struct RegionStatistic {
503 #[serde(default)]
508 pub num_rows: u64,
509 pub memtable_size: u64,
511 pub wal_size: u64,
513 pub manifest_size: u64,
515 pub sst_size: u64,
519 pub sst_num: u64,
523 #[serde(default)]
527 pub index_size: u64,
528 #[serde(default)]
530 pub manifest: RegionManifestInfo,
531 #[serde(default)]
532 pub written_bytes: u64,
534 #[serde(default)]
538 pub data_topic_latest_entry_id: u64,
539 #[serde(default)]
540 pub metadata_topic_latest_entry_id: u64,
541}
542
543#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
545pub enum RegionManifestInfo {
546 Mito {
547 manifest_version: u64,
548 flushed_entry_id: u64,
549 file_removed_cnt: u64,
551 },
552 Metric {
553 data_manifest_version: u64,
554 data_flushed_entry_id: u64,
555 metadata_manifest_version: u64,
556 metadata_flushed_entry_id: u64,
557 },
558}
559
560impl RegionManifestInfo {
561 pub fn mito(manifest_version: u64, flushed_entry_id: u64, file_removal_rate: u64) -> Self {
563 Self::Mito {
564 manifest_version,
565 flushed_entry_id,
566 file_removed_cnt: file_removal_rate,
567 }
568 }
569
570 pub fn metric(
572 data_manifest_version: u64,
573 data_flushed_entry_id: u64,
574 metadata_manifest_version: u64,
575 metadata_flushed_entry_id: u64,
576 ) -> Self {
577 Self::Metric {
578 data_manifest_version,
579 data_flushed_entry_id,
580 metadata_manifest_version,
581 metadata_flushed_entry_id,
582 }
583 }
584
585 pub fn is_mito(&self) -> bool {
587 matches!(self, RegionManifestInfo::Mito { .. })
588 }
589
590 pub fn is_metric(&self) -> bool {
592 matches!(self, RegionManifestInfo::Metric { .. })
593 }
594
595 pub fn data_flushed_entry_id(&self) -> u64 {
597 match self {
598 RegionManifestInfo::Mito {
599 flushed_entry_id, ..
600 } => *flushed_entry_id,
601 RegionManifestInfo::Metric {
602 data_flushed_entry_id,
603 ..
604 } => *data_flushed_entry_id,
605 }
606 }
607
608 pub fn data_manifest_version(&self) -> u64 {
610 match self {
611 RegionManifestInfo::Mito {
612 manifest_version, ..
613 } => *manifest_version,
614 RegionManifestInfo::Metric {
615 data_manifest_version,
616 ..
617 } => *data_manifest_version,
618 }
619 }
620
621 pub fn metadata_manifest_version(&self) -> Option<u64> {
623 match self {
624 RegionManifestInfo::Mito { .. } => None,
625 RegionManifestInfo::Metric {
626 metadata_manifest_version,
627 ..
628 } => Some(*metadata_manifest_version),
629 }
630 }
631
632 pub fn metadata_flushed_entry_id(&self) -> Option<u64> {
634 match self {
635 RegionManifestInfo::Mito { .. } => None,
636 RegionManifestInfo::Metric {
637 metadata_flushed_entry_id,
638 ..
639 } => Some(*metadata_flushed_entry_id),
640 }
641 }
642
643 pub fn encode_list(manifest_infos: &[(RegionId, Self)]) -> serde_json::Result<Vec<u8>> {
645 serde_json::to_vec(manifest_infos)
646 }
647
648 pub fn decode_list(value: &[u8]) -> serde_json::Result<Vec<(RegionId, Self)>> {
650 serde_json::from_slice(value)
651 }
652}
653
654impl Default for RegionManifestInfo {
655 fn default() -> Self {
656 Self::Mito {
657 manifest_version: 0,
658 flushed_entry_id: 0,
659 file_removed_cnt: 0,
660 }
661 }
662}
663
664impl RegionStatistic {
665 pub fn deserialize_from_slice(value: &[u8]) -> Option<RegionStatistic> {
669 serde_json::from_slice(value).ok()
670 }
671
672 pub fn serialize_to_vec(&self) -> Option<Vec<u8>> {
676 serde_json::to_vec(self).ok()
677 }
678}
679
680impl RegionStatistic {
681 pub fn estimated_disk_size(&self) -> u64 {
683 self.wal_size + self.sst_size + self.manifest_size + self.index_size
684 }
685}
686
687#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
689pub enum SyncRegionFromRequest {
690 FromManifest(RegionManifestInfo),
693 FromRegion {
699 source_region_id: RegionId,
701 parallelism: usize,
703 },
704}
705
706impl From<RegionManifestInfo> for SyncRegionFromRequest {
707 fn from(manifest_info: RegionManifestInfo) -> Self {
708 SyncRegionFromRequest::FromManifest(manifest_info)
709 }
710}
711
712impl SyncRegionFromRequest {
713 pub fn from_manifest(manifest_info: RegionManifestInfo) -> Self {
715 SyncRegionFromRequest::FromManifest(manifest_info)
716 }
717
718 pub fn from_region(source_region_id: RegionId, parallelism: usize) -> Self {
720 SyncRegionFromRequest::FromRegion {
721 source_region_id,
722 parallelism,
723 }
724 }
725
726 pub fn is_from_manifest(&self) -> bool {
728 matches!(self, SyncRegionFromRequest::FromManifest { .. })
729 }
730
731 pub fn into_region_manifest_info(self) -> Option<RegionManifestInfo> {
735 match self {
736 SyncRegionFromRequest::FromManifest(manifest_info) => Some(manifest_info),
737 SyncRegionFromRequest::FromRegion { .. } => None,
738 }
739 }
740}
741
742#[derive(Debug)]
744pub enum SyncRegionFromResponse {
745 NotSupported,
746 Mito {
747 synced: bool,
749 },
750 Metric {
751 metadata_synced: bool,
753 data_synced: bool,
755 new_opened_logical_region_ids: Vec<RegionId>,
758 },
759}
760
761impl SyncRegionFromResponse {
762 pub fn is_data_synced(&self) -> bool {
764 match self {
765 SyncRegionFromResponse::NotSupported => false,
766 SyncRegionFromResponse::Mito { synced } => *synced,
767 SyncRegionFromResponse::Metric { data_synced, .. } => *data_synced,
768 }
769 }
770
771 pub fn is_mito(&self) -> bool {
773 matches!(self, SyncRegionFromResponse::Mito { .. })
774 }
775
776 pub fn is_metric(&self) -> bool {
778 matches!(self, SyncRegionFromResponse::Metric { .. })
779 }
780
781 pub fn new_opened_logical_region_ids(self) -> Option<Vec<RegionId>> {
783 match self {
784 SyncRegionFromResponse::Metric {
785 new_opened_logical_region_ids,
786 ..
787 } => Some(new_opened_logical_region_ids),
788 _ => None,
789 }
790 }
791}
792
793#[derive(Debug, Clone)]
795pub struct RemapManifestsRequest {
796 pub region_id: RegionId,
798 pub input_regions: Vec<RegionId>,
800 pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
802 pub new_partition_exprs: HashMap<RegionId, String>,
804}
805
806#[derive(Debug, Clone)]
808pub struct RemapManifestsResponse {
809 pub manifest_paths: HashMap<RegionId, String>,
814}
815
816#[derive(Debug, Clone)]
818pub struct MitoCopyRegionFromRequest {
819 pub source_region_id: RegionId,
821 pub parallelism: usize,
823}
824
825#[derive(Debug, Clone)]
826pub struct MitoCopyRegionFromResponse {
827 pub copied_file_ids: Vec<FileId>,
829}
830
831#[async_trait]
832pub trait RegionEngine: Send + Sync {
833 fn name(&self) -> &str;
835
836 async fn handle_batch_open_requests(
838 &self,
839 parallelism: usize,
840 requests: Vec<(RegionId, RegionOpenRequest)>,
841 ) -> Result<BatchResponses, BoxedError> {
842 let semaphore = Arc::new(Semaphore::new(parallelism));
843 let mut tasks = Vec::with_capacity(requests.len());
844
845 for (region_id, request) in requests {
846 let semaphore_moved = semaphore.clone();
847
848 tasks.push(async move {
849 let _permit = semaphore_moved.acquire().await.unwrap();
851 let result = self
852 .handle_request(region_id, RegionRequest::Open(request))
853 .await;
854 (region_id, result)
855 });
856 }
857
858 Ok(join_all(tasks).await)
859 }
860
861 async fn handle_batch_catchup_requests(
862 &self,
863 parallelism: usize,
864 requests: Vec<(RegionId, RegionCatchupRequest)>,
865 ) -> Result<BatchResponses, BoxedError> {
866 let semaphore = Arc::new(Semaphore::new(parallelism));
867 let mut tasks = Vec::with_capacity(requests.len());
868
869 for (region_id, request) in requests {
870 let semaphore_moved = semaphore.clone();
871
872 tasks.push(async move {
873 let _permit = semaphore_moved.acquire().await.unwrap();
875 let result = self
876 .handle_request(region_id, RegionRequest::Catchup(request))
877 .await;
878 (region_id, result)
879 });
880 }
881
882 Ok(join_all(tasks).await)
883 }
884
885 async fn handle_batch_ddl_requests(
886 &self,
887 request: BatchRegionDdlRequest,
888 ) -> Result<RegionResponse, BoxedError> {
889 let requests = request.into_region_requests();
890
891 let mut affected_rows = 0;
892 let mut extensions = HashMap::new();
893
894 for (region_id, request) in requests {
895 let result = self.handle_request(region_id, request).await?;
896 affected_rows += result.affected_rows;
897 extensions.extend(result.extensions);
898 }
899
900 Ok(RegionResponse {
901 affected_rows,
902 extensions,
903 metadata: Vec::new(),
904 })
905 }
906
907 async fn handle_request(
909 &self,
910 region_id: RegionId,
911 request: RegionRequest,
912 ) -> Result<RegionResponse, BoxedError>;
913
914 async fn get_committed_sequence(
916 &self,
917 region_id: RegionId,
918 ) -> Result<SequenceNumber, BoxedError>;
919
920 async fn handle_query(
922 &self,
923 region_id: RegionId,
924 request: ScanRequest,
925 ) -> Result<RegionScannerRef, BoxedError>;
926
927 fn query_memory_tracker(&self) -> Option<QueryMemoryTracker> {
929 None
930 }
931
932 async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;
934
935 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic>;
937
938 async fn stop(&self) -> Result<(), BoxedError>;
940
941 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError>;
947
948 async fn sync_region(
950 &self,
951 region_id: RegionId,
952 request: SyncRegionFromRequest,
953 ) -> Result<SyncRegionFromResponse, BoxedError>;
954
955 async fn remap_manifests(
957 &self,
958 request: RemapManifestsRequest,
959 ) -> Result<RemapManifestsResponse, BoxedError>;
960
961 async fn set_region_role_state_gracefully(
965 &self,
966 region_id: RegionId,
967 region_role_state: SettableRegionRoleState,
968 ) -> Result<SetRegionRoleStateResponse, BoxedError>;
969
970 fn role(&self, region_id: RegionId) -> Option<RegionRole>;
974
975 fn as_any(&self) -> &dyn Any;
976}
977
978pub type RegionEngineRef = Arc<dyn RegionEngine>;
979
980pub struct SinglePartitionScanner {
982 stream: Mutex<Option<SendableRecordBatchStream>>,
983 schema: SchemaRef,
984 properties: ScannerProperties,
985 metadata: RegionMetadataRef,
986 snapshot_sequence: Option<SequenceNumber>,
987}
988
989impl SinglePartitionScanner {
990 pub fn new(
992 stream: SendableRecordBatchStream,
993 append_mode: bool,
994 metadata: RegionMetadataRef,
995 snapshot_sequence: Option<SequenceNumber>,
996 ) -> Self {
997 let schema = stream.schema();
998 Self {
999 stream: Mutex::new(Some(stream)),
1000 schema,
1001 properties: ScannerProperties::default().with_append_mode(append_mode),
1002 metadata,
1003 snapshot_sequence,
1004 }
1005 }
1006}
1007
1008impl Debug for SinglePartitionScanner {
1009 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1010 write!(f, "SinglePartitionScanner: <SendableRecordBatchStream>")
1011 }
1012}
1013
1014impl RegionScanner for SinglePartitionScanner {
1015 fn name(&self) -> &str {
1016 "SinglePartition"
1017 }
1018
1019 fn properties(&self) -> &ScannerProperties {
1020 &self.properties
1021 }
1022
1023 fn schema(&self) -> SchemaRef {
1024 self.schema.clone()
1025 }
1026
1027 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
1028 self.properties.prepare(request);
1029 Ok(())
1030 }
1031
1032 fn scan_partition(
1033 &self,
1034 _ctx: &QueryScanContext,
1035 _metrics_set: &ExecutionPlanMetricsSet,
1036 _partition: usize,
1037 ) -> Result<SendableRecordBatchStream, BoxedError> {
1038 let mut stream = self.stream.lock().unwrap();
1039 let result = stream
1040 .take()
1041 .or_else(|| Some(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))));
1042 Ok(result.unwrap())
1043 }
1044
1045 fn has_predicate_without_region(&self) -> bool {
1046 false
1047 }
1048
1049 fn add_dyn_filter_to_predicate(
1050 &mut self,
1051 filter_exprs: Vec<Arc<dyn datafusion_physical_plan::PhysicalExpr>>,
1052 ) -> Vec<bool> {
1053 vec![false; filter_exprs.len()]
1054 }
1055
1056 fn metadata(&self) -> RegionMetadataRef {
1057 self.metadata.clone()
1058 }
1059
1060 fn set_logical_region(&mut self, logical_region: bool) {
1061 self.properties.set_logical_region(logical_region);
1062 }
1063
1064 fn set_query_load_region_id(&mut self, region_id: RegionId) {
1065 self.properties.set_query_load_region_id(region_id);
1066 }
1067
1068 fn snapshot_sequence(&self) -> Option<SequenceNumber> {
1069 self.snapshot_sequence
1070 }
1071}
1072
1073impl DisplayAs for SinglePartitionScanner {
1074 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1075 write!(f, "{:?}", self)
1076 }
1077}