1use std::any::Any;
16use std::fmt::{Debug, Formatter};
17use std::sync::{Arc, Mutex};
18
19use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
20use common_recordbatch::SendableRecordBatchStream;
21use datafusion::execution::context::TaskContext;
22use datafusion::execution::SendableRecordBatchStream as DfSendableRecordBatchStream;
23use datafusion::physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr};
24use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
25use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
26use datafusion_common::DataFusionError;
27use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef;
28use datatypes::schema::SchemaRef;
29
30pub struct StreamScanAdapter {
32 stream: Mutex<Option<SendableRecordBatchStream>>,
33 schema: SchemaRef,
34 arrow_schema: ArrowSchemaRef,
35 properties: PlanProperties,
36 output_ordering: Option<Vec<PhysicalSortExpr>>,
37}
38
39impl Debug for StreamScanAdapter {
40 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
41 f.debug_struct("StreamScanAdapter")
42 .field("stream", &"<SendableRecordBatchStream>")
43 .field("schema", &self.schema)
44 .finish()
45 }
46}
47
48impl StreamScanAdapter {
49 pub fn new(stream: SendableRecordBatchStream) -> Self {
50 let schema = stream.schema();
51 let arrow_schema = schema.arrow_schema().clone();
52 let properties = PlanProperties::new(
53 EquivalenceProperties::new(arrow_schema.clone()),
54 Partitioning::UnknownPartitioning(1),
55 EmissionType::Incremental,
56 Boundedness::Bounded,
57 );
58
59 Self {
60 stream: Mutex::new(Some(stream)),
61 schema,
62 arrow_schema,
63 properties,
64 output_ordering: None,
65 }
66 }
67
68 pub fn with_output_ordering(mut self, output_ordering: Option<Vec<PhysicalSortExpr>>) -> Self {
69 self.output_ordering = output_ordering;
70 self
71 }
72}
73
74impl DisplayAs for StreamScanAdapter {
75 fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
76 write!(
77 f,
78 "StreamScanAdapter: [<SendableRecordBatchStream>], schema: ["
79 )?;
80 write!(f, "{:?}", &self.arrow_schema)?;
81 write!(f, "]")
82 }
83}
84
85impl ExecutionPlan for StreamScanAdapter {
86 fn as_any(&self) -> &dyn Any {
87 self
88 }
89
90 fn schema(&self) -> ArrowSchemaRef {
91 self.arrow_schema.clone()
92 }
93
94 fn properties(&self) -> &PlanProperties {
95 &self.properties
96 }
97
98 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
99 vec![]
100 }
101
102 fn with_new_children(
105 self: Arc<Self>,
106 _children: Vec<Arc<dyn ExecutionPlan>>,
107 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
108 Ok(self.clone())
109 }
110
111 fn execute(
112 &self,
113 _partition: usize,
114 _context: Arc<TaskContext>,
115 ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
116 let mut stream = self.stream.lock().unwrap();
117 let stream = stream
118 .take()
119 .ok_or_else(|| DataFusionError::Execution("Stream already exhausted".to_string()))?;
120 Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream)))
121 }
122
123 fn name(&self) -> &str {
124 "StreamScanAdapter"
125 }
126}
127
128#[cfg(test)]
129mod test {
130 use common_recordbatch::{RecordBatch, RecordBatches};
131 use datafusion::prelude::SessionContext;
132 use datatypes::data_type::ConcreteDataType;
133 use datatypes::schema::{ColumnSchema, Schema};
134 use datatypes::vectors::Int32Vector;
135 use futures_util::TryStreamExt;
136
137 use super::*;
138
139 #[tokio::test]
140 async fn test_simple_table_scan() {
141 let ctx = SessionContext::new();
142 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
143 "a",
144 ConcreteDataType::int32_datatype(),
145 false,
146 )]));
147
148 let batch1 = RecordBatch::new(
149 schema.clone(),
150 vec![Arc::new(Int32Vector::from_slice([1, 2])) as _],
151 )
152 .unwrap();
153 let batch2 = RecordBatch::new(
154 schema.clone(),
155 vec![Arc::new(Int32Vector::from_slice([3, 4, 5])) as _],
156 )
157 .unwrap();
158
159 let recordbatches =
160 RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap();
161 let stream = recordbatches.as_stream();
162
163 let scan = StreamScanAdapter::new(stream);
164
165 assert_eq!(scan.schema(), schema.arrow_schema().clone());
166
167 let stream = scan.execute(0, ctx.task_ctx()).unwrap();
168 let recordbatches = stream.try_collect::<Vec<_>>().await.unwrap();
169 assert_eq!(recordbatches[0], batch1.into_df_record_batch());
170 assert_eq!(recordbatches[1], batch2.into_df_record_batch());
171
172 let result = scan.execute(0, ctx.task_ctx());
173 assert!(result.is_err());
174 match result {
175 Err(e) => assert!(e.to_string().contains("Stream already exhausted")),
176 _ => unreachable!(),
177 }
178 }
179}