common_query/
stream.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::fmt::{Debug, Formatter};
17use std::sync::{Arc, Mutex};
18
19use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
20use common_recordbatch::SendableRecordBatchStream;
21use datafusion::execution::context::TaskContext;
22use datafusion::execution::SendableRecordBatchStream as DfSendableRecordBatchStream;
23use datafusion::physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr};
24use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
25use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
26use datafusion_common::DataFusionError;
27use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef;
28use datatypes::schema::SchemaRef;
29
30/// Adapts greptime's [SendableRecordBatchStream] to DataFusion's [ExecutionPlan].
31pub struct StreamScanAdapter {
32    stream: Mutex<Option<SendableRecordBatchStream>>,
33    schema: SchemaRef,
34    arrow_schema: ArrowSchemaRef,
35    properties: PlanProperties,
36    output_ordering: Option<Vec<PhysicalSortExpr>>,
37}
38
39impl Debug for StreamScanAdapter {
40    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
41        f.debug_struct("StreamScanAdapter")
42            .field("stream", &"<SendableRecordBatchStream>")
43            .field("schema", &self.schema)
44            .finish()
45    }
46}
47
48impl StreamScanAdapter {
49    pub fn new(stream: SendableRecordBatchStream) -> Self {
50        let schema = stream.schema();
51        let arrow_schema = schema.arrow_schema().clone();
52        let properties = PlanProperties::new(
53            EquivalenceProperties::new(arrow_schema.clone()),
54            Partitioning::UnknownPartitioning(1),
55            EmissionType::Incremental,
56            Boundedness::Bounded,
57        );
58
59        Self {
60            stream: Mutex::new(Some(stream)),
61            schema,
62            arrow_schema,
63            properties,
64            output_ordering: None,
65        }
66    }
67
68    pub fn with_output_ordering(mut self, output_ordering: Option<Vec<PhysicalSortExpr>>) -> Self {
69        self.output_ordering = output_ordering;
70        self
71    }
72}
73
74impl DisplayAs for StreamScanAdapter {
75    fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
76        write!(
77            f,
78            "StreamScanAdapter: [<SendableRecordBatchStream>], schema: ["
79        )?;
80        write!(f, "{:?}", &self.arrow_schema)?;
81        write!(f, "]")
82    }
83}
84
85impl ExecutionPlan for StreamScanAdapter {
86    fn as_any(&self) -> &dyn Any {
87        self
88    }
89
90    fn schema(&self) -> ArrowSchemaRef {
91        self.arrow_schema.clone()
92    }
93
94    fn properties(&self) -> &PlanProperties {
95        &self.properties
96    }
97
98    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
99        vec![]
100    }
101
102    // DataFusion will swap children unconditionally.
103    // But since this node is leaf node, it's safe to just return self.
104    fn with_new_children(
105        self: Arc<Self>,
106        _children: Vec<Arc<dyn ExecutionPlan>>,
107    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
108        Ok(self.clone())
109    }
110
111    fn execute(
112        &self,
113        _partition: usize,
114        _context: Arc<TaskContext>,
115    ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
116        let mut stream = self.stream.lock().unwrap();
117        let stream = stream
118            .take()
119            .ok_or_else(|| DataFusionError::Execution("Stream already exhausted".to_string()))?;
120        Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream)))
121    }
122
123    fn name(&self) -> &str {
124        "StreamScanAdapter"
125    }
126}
127
128#[cfg(test)]
129mod test {
130    use common_recordbatch::{RecordBatch, RecordBatches};
131    use datafusion::prelude::SessionContext;
132    use datatypes::data_type::ConcreteDataType;
133    use datatypes::schema::{ColumnSchema, Schema};
134    use datatypes::vectors::Int32Vector;
135    use futures_util::TryStreamExt;
136
137    use super::*;
138
139    #[tokio::test]
140    async fn test_simple_table_scan() {
141        let ctx = SessionContext::new();
142        let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
143            "a",
144            ConcreteDataType::int32_datatype(),
145            false,
146        )]));
147
148        let batch1 = RecordBatch::new(
149            schema.clone(),
150            vec![Arc::new(Int32Vector::from_slice([1, 2])) as _],
151        )
152        .unwrap();
153        let batch2 = RecordBatch::new(
154            schema.clone(),
155            vec![Arc::new(Int32Vector::from_slice([3, 4, 5])) as _],
156        )
157        .unwrap();
158
159        let recordbatches =
160            RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap();
161        let stream = recordbatches.as_stream();
162
163        let scan = StreamScanAdapter::new(stream);
164
165        assert_eq!(scan.schema(), schema.arrow_schema().clone());
166
167        let stream = scan.execute(0, ctx.task_ctx()).unwrap();
168        let recordbatches = stream.try_collect::<Vec<_>>().await.unwrap();
169        assert_eq!(recordbatches[0], batch1.into_df_record_batch());
170        assert_eq!(recordbatches[1], batch2.into_df_record_batch());
171
172        let result = scan.execute(0, ctx.task_ctx());
173        assert!(result.is_err());
174        match result {
175            Err(e) => assert!(e.to_string().contains("Stream already exhausted")),
176            _ => unreachable!(),
177        }
178    }
179}