1use std::any::Any;
18use std::collections::HashMap;
19use std::fmt::{Debug, Display};
20use std::sync::{Arc, Mutex};
21
22use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
23use api::region::RegionResponse;
24use async_trait::async_trait;
25use common_error::ext::BoxedError;
26use common_recordbatch::{EmptyRecordBatchStream, QueryMemoryTracker, SendableRecordBatchStream};
27use common_time::Timestamp;
28use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
29use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PhysicalExpr};
30use datatypes::schema::SchemaRef;
31use futures::future::join_all;
32use serde::{Deserialize, Serialize};
33use tokio::sync::Semaphore;
34
35use crate::logstore::entry;
36use crate::metadata::RegionMetadataRef;
37use crate::region_request::{
38 BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
39};
40use crate::storage::{FileId, RegionId, ScanRequest, SequenceNumber};
41
42#[derive(Debug, PartialEq, Eq, Clone, Copy)]
44pub enum SettableRegionRoleState {
45 Follower,
46 DowngradingLeader,
47 Leader,
49 StagingLeader,
51}
52
53impl Display for SettableRegionRoleState {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 match self {
56 SettableRegionRoleState::Follower => write!(f, "Follower"),
57 SettableRegionRoleState::DowngradingLeader => write!(f, "Leader(Downgrading)"),
58 SettableRegionRoleState::Leader => write!(f, "Leader"),
59 SettableRegionRoleState::StagingLeader => write!(f, "Leader(Staging)"),
60 }
61 }
62}
63
64impl From<SettableRegionRoleState> for RegionRole {
65 fn from(value: SettableRegionRoleState) -> Self {
66 match value {
67 SettableRegionRoleState::Follower => RegionRole::Follower,
68 SettableRegionRoleState::DowngradingLeader => RegionRole::DowngradingLeader,
69 SettableRegionRoleState::Leader => RegionRole::Leader,
70 SettableRegionRoleState::StagingLeader => RegionRole::Leader, }
72 }
73}
74
75#[derive(Debug, PartialEq, Eq)]
77pub struct SetRegionRoleStateRequest {
78 region_id: RegionId,
79 region_role_state: SettableRegionRoleState,
80}
81
82#[derive(Debug, PartialEq, Eq)]
84pub enum SetRegionRoleStateSuccess {
85 File,
86 Mito {
87 last_entry_id: entry::Id,
88 },
89 Metric {
90 last_entry_id: entry::Id,
91 metadata_last_entry_id: entry::Id,
92 },
93}
94
95impl SetRegionRoleStateSuccess {
96 pub fn file() -> Self {
98 Self::File
99 }
100
101 pub fn mito(last_entry_id: entry::Id) -> Self {
103 SetRegionRoleStateSuccess::Mito { last_entry_id }
104 }
105
106 pub fn metric(last_entry_id: entry::Id, metadata_last_entry_id: entry::Id) -> Self {
108 SetRegionRoleStateSuccess::Metric {
109 last_entry_id,
110 metadata_last_entry_id,
111 }
112 }
113}
114
115impl SetRegionRoleStateSuccess {
116 pub fn last_entry_id(&self) -> Option<entry::Id> {
118 match self {
119 SetRegionRoleStateSuccess::File => None,
120 SetRegionRoleStateSuccess::Mito { last_entry_id } => Some(*last_entry_id),
121 SetRegionRoleStateSuccess::Metric { last_entry_id, .. } => Some(*last_entry_id),
122 }
123 }
124
125 pub fn metadata_last_entry_id(&self) -> Option<entry::Id> {
127 match self {
128 SetRegionRoleStateSuccess::File => None,
129 SetRegionRoleStateSuccess::Mito { .. } => None,
130 SetRegionRoleStateSuccess::Metric {
131 metadata_last_entry_id,
132 ..
133 } => Some(*metadata_last_entry_id),
134 }
135 }
136}
137
138#[derive(Debug)]
140pub enum SetRegionRoleStateResponse {
141 Success(SetRegionRoleStateSuccess),
142 NotFound,
143 InvalidTransition(BoxedError),
144}
145
146impl SetRegionRoleStateResponse {
147 pub fn success(success: SetRegionRoleStateSuccess) -> Self {
149 Self::Success(success)
150 }
151
152 pub fn invalid_transition(error: BoxedError) -> Self {
154 Self::InvalidTransition(error)
155 }
156
157 pub fn is_not_found(&self) -> bool {
159 matches!(self, SetRegionRoleStateResponse::NotFound)
160 }
161
162 pub fn is_invalid_transition(&self) -> bool {
164 matches!(self, SetRegionRoleStateResponse::InvalidTransition(_))
165 }
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
169pub struct GrantedRegion {
170 pub region_id: RegionId,
171 pub region_role: RegionRole,
172 pub extensions: HashMap<String, Vec<u8>>,
173}
174
175impl GrantedRegion {
176 pub fn new(region_id: RegionId, region_role: RegionRole) -> Self {
177 Self {
178 region_id,
179 region_role,
180 extensions: HashMap::new(),
181 }
182 }
183}
184
185impl From<GrantedRegion> for PbGrantedRegion {
186 fn from(value: GrantedRegion) -> Self {
187 PbGrantedRegion {
188 region_id: value.region_id.as_u64(),
189 role: PbRegionRole::from(value.region_role).into(),
190 extensions: value.extensions,
191 }
192 }
193}
194
195impl From<PbGrantedRegion> for GrantedRegion {
196 fn from(value: PbGrantedRegion) -> Self {
197 GrantedRegion {
198 region_id: RegionId::from_u64(value.region_id),
199 region_role: value.role().into(),
200 extensions: value.extensions,
201 }
202 }
203}
204
205#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
208pub enum RegionRole {
209 Follower,
211 Leader,
213 DowngradingLeader,
217}
218
219impl Display for RegionRole {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 match self {
222 RegionRole::Follower => write!(f, "Follower"),
223 RegionRole::Leader => write!(f, "Leader"),
224 RegionRole::DowngradingLeader => write!(f, "Leader(Downgrading)"),
225 }
226 }
227}
228
229impl RegionRole {
230 pub fn writable(&self) -> bool {
231 matches!(self, RegionRole::Leader)
232 }
233}
234
235impl From<RegionRole> for PbRegionRole {
236 fn from(value: RegionRole) -> Self {
237 match value {
238 RegionRole::Follower => PbRegionRole::Follower,
239 RegionRole::Leader => PbRegionRole::Leader,
240 RegionRole::DowngradingLeader => PbRegionRole::DowngradingLeader,
241 }
242 }
243}
244
245impl From<PbRegionRole> for RegionRole {
246 fn from(value: PbRegionRole) -> Self {
247 match value {
248 PbRegionRole::Leader => RegionRole::Leader,
249 PbRegionRole::Follower => RegionRole::Follower,
250 PbRegionRole::DowngradingLeader => RegionRole::DowngradingLeader,
251 }
252 }
253}
254
255#[derive(Debug)]
257pub enum ScannerPartitioning {
258 Unknown(usize),
260}
261
262impl ScannerPartitioning {
263 pub fn num_partitions(&self) -> usize {
265 match self {
266 ScannerPartitioning::Unknown(num_partitions) => *num_partitions,
267 }
268 }
269}
270
271#[derive(Debug, Clone, Copy, PartialEq, Eq)]
273pub struct PartitionRange {
274 pub start: Timestamp,
276 pub end: Timestamp,
278 pub num_rows: usize,
280 pub identifier: usize,
282}
283
284#[derive(Debug, Default)]
286pub struct ScannerProperties {
287 pub partitions: Vec<Vec<PartitionRange>>,
292
293 append_mode: bool,
295
296 total_rows: usize,
299
300 pub distinguish_partition_range: bool,
302
303 target_partitions: usize,
305
306 logical_region: bool,
308}
309
310impl ScannerProperties {
311 pub fn with_append_mode(mut self, append_mode: bool) -> Self {
313 self.append_mode = append_mode;
314 self
315 }
316
317 pub fn with_total_rows(mut self, total_rows: usize) -> Self {
319 self.total_rows = total_rows;
320 self
321 }
322
323 pub fn new(partitions: Vec<Vec<PartitionRange>>, append_mode: bool, total_rows: usize) -> Self {
325 Self {
326 partitions,
327 append_mode,
328 total_rows,
329 distinguish_partition_range: false,
330 target_partitions: 0,
331 logical_region: false,
332 }
333 }
334
335 pub fn prepare(&mut self, request: PrepareRequest) {
337 if let Some(ranges) = request.ranges {
338 self.partitions = ranges;
339 }
340 if let Some(distinguish_partition_range) = request.distinguish_partition_range {
341 self.distinguish_partition_range = distinguish_partition_range;
342 }
343 if let Some(target_partitions) = request.target_partitions {
344 self.target_partitions = target_partitions;
345 }
346 }
347
348 pub fn num_partitions(&self) -> usize {
350 self.partitions.len()
351 }
352
353 pub fn append_mode(&self) -> bool {
354 self.append_mode
355 }
356
357 pub fn total_rows(&self) -> usize {
358 self.total_rows
359 }
360
361 pub fn is_logical_region(&self) -> bool {
363 self.logical_region
364 }
365
366 pub fn target_partitions(&self) -> usize {
368 if self.target_partitions == 0 {
369 self.num_partitions()
370 } else {
371 self.target_partitions
372 }
373 }
374
375 pub fn set_logical_region(&mut self, logical_region: bool) {
377 self.logical_region = logical_region;
378 }
379}
380
381#[derive(Default)]
383pub struct PrepareRequest {
384 pub ranges: Option<Vec<Vec<PartitionRange>>>,
386 pub distinguish_partition_range: Option<bool>,
388 pub target_partitions: Option<usize>,
390}
391
392impl PrepareRequest {
393 pub fn with_ranges(mut self, ranges: Vec<Vec<PartitionRange>>) -> Self {
395 self.ranges = Some(ranges);
396 self
397 }
398
399 pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self {
401 self.distinguish_partition_range = Some(distinguish_partition_range);
402 self
403 }
404
405 pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
407 self.target_partitions = Some(target_partitions);
408 self
409 }
410}
411
412#[derive(Clone, Default)]
414pub struct QueryScanContext {
415 pub explain_verbose: bool,
417}
418
419pub trait RegionScanner: Debug + DisplayAs + Send {
424 fn name(&self) -> &str;
425
426 fn properties(&self) -> &ScannerProperties;
428
429 fn schema(&self) -> SchemaRef;
431
432 fn metadata(&self) -> RegionMetadataRef;
434
435 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>;
439
440 fn scan_partition(
445 &self,
446 ctx: &QueryScanContext,
447 metrics_set: &ExecutionPlanMetricsSet,
448 partition: usize,
449 ) -> Result<SendableRecordBatchStream, BoxedError>;
450
451 fn has_predicate_without_region(&self) -> bool;
453
454 fn add_dyn_filter_to_predicate(
459 &mut self,
460 filter_exprs: Vec<Arc<dyn PhysicalExpr>>,
461 ) -> Vec<bool>;
462
463 fn set_logical_region(&mut self, logical_region: bool);
465
466 fn snapshot_sequence(&self) -> Option<SequenceNumber> {
467 None
468 }
469}
470
471pub type RegionScannerRef = Box<dyn RegionScanner>;
472
473pub type BatchResponses = Vec<(RegionId, Result<RegionResponse, BoxedError>)>;
474
475#[derive(Debug, Deserialize, Serialize, Default)]
477pub struct RegionStatistic {
478 #[serde(default)]
480 pub num_rows: u64,
481 pub memtable_size: u64,
483 pub wal_size: u64,
485 pub manifest_size: u64,
487 pub sst_size: u64,
489 pub sst_num: u64,
491 #[serde(default)]
493 pub index_size: u64,
494 #[serde(default)]
496 pub manifest: RegionManifestInfo,
497 #[serde(default)]
498 pub written_bytes: u64,
500 #[serde(default)]
504 pub data_topic_latest_entry_id: u64,
505 #[serde(default)]
506 pub metadata_topic_latest_entry_id: u64,
507}
508
509#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
511pub enum RegionManifestInfo {
512 Mito {
513 manifest_version: u64,
514 flushed_entry_id: u64,
515 file_removed_cnt: u64,
517 },
518 Metric {
519 data_manifest_version: u64,
520 data_flushed_entry_id: u64,
521 metadata_manifest_version: u64,
522 metadata_flushed_entry_id: u64,
523 },
524}
525
526impl RegionManifestInfo {
527 pub fn mito(manifest_version: u64, flushed_entry_id: u64, file_removal_rate: u64) -> Self {
529 Self::Mito {
530 manifest_version,
531 flushed_entry_id,
532 file_removed_cnt: file_removal_rate,
533 }
534 }
535
536 pub fn metric(
538 data_manifest_version: u64,
539 data_flushed_entry_id: u64,
540 metadata_manifest_version: u64,
541 metadata_flushed_entry_id: u64,
542 ) -> Self {
543 Self::Metric {
544 data_manifest_version,
545 data_flushed_entry_id,
546 metadata_manifest_version,
547 metadata_flushed_entry_id,
548 }
549 }
550
551 pub fn is_mito(&self) -> bool {
553 matches!(self, RegionManifestInfo::Mito { .. })
554 }
555
556 pub fn is_metric(&self) -> bool {
558 matches!(self, RegionManifestInfo::Metric { .. })
559 }
560
561 pub fn data_flushed_entry_id(&self) -> u64 {
563 match self {
564 RegionManifestInfo::Mito {
565 flushed_entry_id, ..
566 } => *flushed_entry_id,
567 RegionManifestInfo::Metric {
568 data_flushed_entry_id,
569 ..
570 } => *data_flushed_entry_id,
571 }
572 }
573
574 pub fn data_manifest_version(&self) -> u64 {
576 match self {
577 RegionManifestInfo::Mito {
578 manifest_version, ..
579 } => *manifest_version,
580 RegionManifestInfo::Metric {
581 data_manifest_version,
582 ..
583 } => *data_manifest_version,
584 }
585 }
586
587 pub fn metadata_manifest_version(&self) -> Option<u64> {
589 match self {
590 RegionManifestInfo::Mito { .. } => None,
591 RegionManifestInfo::Metric {
592 metadata_manifest_version,
593 ..
594 } => Some(*metadata_manifest_version),
595 }
596 }
597
598 pub fn metadata_flushed_entry_id(&self) -> Option<u64> {
600 match self {
601 RegionManifestInfo::Mito { .. } => None,
602 RegionManifestInfo::Metric {
603 metadata_flushed_entry_id,
604 ..
605 } => Some(*metadata_flushed_entry_id),
606 }
607 }
608
609 pub fn encode_list(manifest_infos: &[(RegionId, Self)]) -> serde_json::Result<Vec<u8>> {
611 serde_json::to_vec(manifest_infos)
612 }
613
614 pub fn decode_list(value: &[u8]) -> serde_json::Result<Vec<(RegionId, Self)>> {
616 serde_json::from_slice(value)
617 }
618}
619
620impl Default for RegionManifestInfo {
621 fn default() -> Self {
622 Self::Mito {
623 manifest_version: 0,
624 flushed_entry_id: 0,
625 file_removed_cnt: 0,
626 }
627 }
628}
629
630impl RegionStatistic {
631 pub fn deserialize_from_slice(value: &[u8]) -> Option<RegionStatistic> {
635 serde_json::from_slice(value).ok()
636 }
637
638 pub fn serialize_to_vec(&self) -> Option<Vec<u8>> {
642 serde_json::to_vec(self).ok()
643 }
644}
645
646impl RegionStatistic {
647 pub fn estimated_disk_size(&self) -> u64 {
649 self.wal_size + self.sst_size + self.manifest_size + self.index_size
650 }
651}
652
653#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
655pub enum SyncRegionFromRequest {
656 FromManifest(RegionManifestInfo),
659 FromRegion {
665 source_region_id: RegionId,
667 parallelism: usize,
669 },
670}
671
672impl From<RegionManifestInfo> for SyncRegionFromRequest {
673 fn from(manifest_info: RegionManifestInfo) -> Self {
674 SyncRegionFromRequest::FromManifest(manifest_info)
675 }
676}
677
678impl SyncRegionFromRequest {
679 pub fn from_manifest(manifest_info: RegionManifestInfo) -> Self {
681 SyncRegionFromRequest::FromManifest(manifest_info)
682 }
683
684 pub fn from_region(source_region_id: RegionId, parallelism: usize) -> Self {
686 SyncRegionFromRequest::FromRegion {
687 source_region_id,
688 parallelism,
689 }
690 }
691
692 pub fn is_from_manifest(&self) -> bool {
694 matches!(self, SyncRegionFromRequest::FromManifest { .. })
695 }
696
697 pub fn into_region_manifest_info(self) -> Option<RegionManifestInfo> {
701 match self {
702 SyncRegionFromRequest::FromManifest(manifest_info) => Some(manifest_info),
703 SyncRegionFromRequest::FromRegion { .. } => None,
704 }
705 }
706}
707
708#[derive(Debug)]
710pub enum SyncRegionFromResponse {
711 NotSupported,
712 Mito {
713 synced: bool,
715 },
716 Metric {
717 metadata_synced: bool,
719 data_synced: bool,
721 new_opened_logical_region_ids: Vec<RegionId>,
724 },
725}
726
727impl SyncRegionFromResponse {
728 pub fn is_data_synced(&self) -> bool {
730 match self {
731 SyncRegionFromResponse::NotSupported => false,
732 SyncRegionFromResponse::Mito { synced } => *synced,
733 SyncRegionFromResponse::Metric { data_synced, .. } => *data_synced,
734 }
735 }
736
737 pub fn is_mito(&self) -> bool {
739 matches!(self, SyncRegionFromResponse::Mito { .. })
740 }
741
742 pub fn is_metric(&self) -> bool {
744 matches!(self, SyncRegionFromResponse::Metric { .. })
745 }
746
747 pub fn new_opened_logical_region_ids(self) -> Option<Vec<RegionId>> {
749 match self {
750 SyncRegionFromResponse::Metric {
751 new_opened_logical_region_ids,
752 ..
753 } => Some(new_opened_logical_region_ids),
754 _ => None,
755 }
756 }
757}
758
759#[derive(Debug, Clone)]
761pub struct RemapManifestsRequest {
762 pub region_id: RegionId,
764 pub input_regions: Vec<RegionId>,
766 pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
768 pub new_partition_exprs: HashMap<RegionId, String>,
770}
771
772#[derive(Debug, Clone)]
774pub struct RemapManifestsResponse {
775 pub manifest_paths: HashMap<RegionId, String>,
780}
781
782#[derive(Debug, Clone)]
784pub struct MitoCopyRegionFromRequest {
785 pub source_region_id: RegionId,
787 pub parallelism: usize,
789}
790
791#[derive(Debug, Clone)]
792pub struct MitoCopyRegionFromResponse {
793 pub copied_file_ids: Vec<FileId>,
795}
796
797#[async_trait]
798pub trait RegionEngine: Send + Sync {
799 fn name(&self) -> &str;
801
802 async fn handle_batch_open_requests(
804 &self,
805 parallelism: usize,
806 requests: Vec<(RegionId, RegionOpenRequest)>,
807 ) -> Result<BatchResponses, BoxedError> {
808 let semaphore = Arc::new(Semaphore::new(parallelism));
809 let mut tasks = Vec::with_capacity(requests.len());
810
811 for (region_id, request) in requests {
812 let semaphore_moved = semaphore.clone();
813
814 tasks.push(async move {
815 let _permit = semaphore_moved.acquire().await.unwrap();
817 let result = self
818 .handle_request(region_id, RegionRequest::Open(request))
819 .await;
820 (region_id, result)
821 });
822 }
823
824 Ok(join_all(tasks).await)
825 }
826
827 async fn handle_batch_catchup_requests(
828 &self,
829 parallelism: usize,
830 requests: Vec<(RegionId, RegionCatchupRequest)>,
831 ) -> Result<BatchResponses, BoxedError> {
832 let semaphore = Arc::new(Semaphore::new(parallelism));
833 let mut tasks = Vec::with_capacity(requests.len());
834
835 for (region_id, request) in requests {
836 let semaphore_moved = semaphore.clone();
837
838 tasks.push(async move {
839 let _permit = semaphore_moved.acquire().await.unwrap();
841 let result = self
842 .handle_request(region_id, RegionRequest::Catchup(request))
843 .await;
844 (region_id, result)
845 });
846 }
847
848 Ok(join_all(tasks).await)
849 }
850
851 async fn handle_batch_ddl_requests(
852 &self,
853 request: BatchRegionDdlRequest,
854 ) -> Result<RegionResponse, BoxedError> {
855 let requests = request.into_region_requests();
856
857 let mut affected_rows = 0;
858 let mut extensions = HashMap::new();
859
860 for (region_id, request) in requests {
861 let result = self.handle_request(region_id, request).await?;
862 affected_rows += result.affected_rows;
863 extensions.extend(result.extensions);
864 }
865
866 Ok(RegionResponse {
867 affected_rows,
868 extensions,
869 metadata: Vec::new(),
870 })
871 }
872
873 async fn handle_request(
875 &self,
876 region_id: RegionId,
877 request: RegionRequest,
878 ) -> Result<RegionResponse, BoxedError>;
879
880 async fn get_committed_sequence(
882 &self,
883 region_id: RegionId,
884 ) -> Result<SequenceNumber, BoxedError>;
885
886 async fn handle_query(
888 &self,
889 region_id: RegionId,
890 request: ScanRequest,
891 ) -> Result<RegionScannerRef, BoxedError>;
892
893 fn query_memory_tracker(&self) -> Option<QueryMemoryTracker> {
895 None
896 }
897
898 async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;
900
901 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic>;
903
904 async fn stop(&self) -> Result<(), BoxedError>;
906
907 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError>;
913
914 async fn sync_region(
916 &self,
917 region_id: RegionId,
918 request: SyncRegionFromRequest,
919 ) -> Result<SyncRegionFromResponse, BoxedError>;
920
921 async fn remap_manifests(
923 &self,
924 request: RemapManifestsRequest,
925 ) -> Result<RemapManifestsResponse, BoxedError>;
926
927 async fn set_region_role_state_gracefully(
931 &self,
932 region_id: RegionId,
933 region_role_state: SettableRegionRoleState,
934 ) -> Result<SetRegionRoleStateResponse, BoxedError>;
935
936 fn role(&self, region_id: RegionId) -> Option<RegionRole>;
940
941 fn as_any(&self) -> &dyn Any;
942}
943
944pub type RegionEngineRef = Arc<dyn RegionEngine>;
945
946pub struct SinglePartitionScanner {
948 stream: Mutex<Option<SendableRecordBatchStream>>,
949 schema: SchemaRef,
950 properties: ScannerProperties,
951 metadata: RegionMetadataRef,
952 snapshot_sequence: Option<SequenceNumber>,
953}
954
955impl SinglePartitionScanner {
956 pub fn new(
958 stream: SendableRecordBatchStream,
959 append_mode: bool,
960 metadata: RegionMetadataRef,
961 snapshot_sequence: Option<SequenceNumber>,
962 ) -> Self {
963 let schema = stream.schema();
964 Self {
965 stream: Mutex::new(Some(stream)),
966 schema,
967 properties: ScannerProperties::default().with_append_mode(append_mode),
968 metadata,
969 snapshot_sequence,
970 }
971 }
972}
973
974impl Debug for SinglePartitionScanner {
975 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
976 write!(f, "SinglePartitionScanner: <SendableRecordBatchStream>")
977 }
978}
979
980impl RegionScanner for SinglePartitionScanner {
981 fn name(&self) -> &str {
982 "SinglePartition"
983 }
984
985 fn properties(&self) -> &ScannerProperties {
986 &self.properties
987 }
988
989 fn schema(&self) -> SchemaRef {
990 self.schema.clone()
991 }
992
993 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
994 self.properties.prepare(request);
995 Ok(())
996 }
997
998 fn scan_partition(
999 &self,
1000 _ctx: &QueryScanContext,
1001 _metrics_set: &ExecutionPlanMetricsSet,
1002 _partition: usize,
1003 ) -> Result<SendableRecordBatchStream, BoxedError> {
1004 let mut stream = self.stream.lock().unwrap();
1005 let result = stream
1006 .take()
1007 .or_else(|| Some(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))));
1008 Ok(result.unwrap())
1009 }
1010
1011 fn has_predicate_without_region(&self) -> bool {
1012 false
1013 }
1014
1015 fn add_dyn_filter_to_predicate(
1016 &mut self,
1017 filter_exprs: Vec<Arc<dyn datafusion_physical_plan::PhysicalExpr>>,
1018 ) -> Vec<bool> {
1019 vec![false; filter_exprs.len()]
1020 }
1021
1022 fn metadata(&self) -> RegionMetadataRef {
1023 self.metadata.clone()
1024 }
1025
1026 fn set_logical_region(&mut self, logical_region: bool) {
1027 self.properties.set_logical_region(logical_region);
1028 }
1029
1030 fn snapshot_sequence(&self) -> Option<SequenceNumber> {
1031 self.snapshot_sequence
1032 }
1033}
1034
1035impl DisplayAs for SinglePartitionScanner {
1036 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1037 write!(f, "{:?}", self)
1038 }
1039}