common_recordbatch/
lib.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(never_type)]
16
17pub mod adapter;
18pub mod cursor;
19pub mod error;
20pub mod filter;
21mod recordbatch;
22pub mod util;
23
24use std::pin::Pin;
25use std::sync::Arc;
26
27use adapter::RecordBatchMetrics;
28use arc_swap::ArcSwapOption;
29pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
30use datatypes::arrow::compute::SortOptions;
31pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch;
32use datatypes::arrow::util::pretty;
33use datatypes::prelude::VectorRef;
34use datatypes::schema::{Schema, SchemaRef};
35use error::Result;
36use futures::task::{Context, Poll};
37use futures::{Stream, TryStreamExt};
38pub use recordbatch::RecordBatch;
39use snafu::{ensure, ResultExt};
40
41pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
42    fn name(&self) -> &str {
43        "RecordBatchStream"
44    }
45
46    fn schema(&self) -> SchemaRef;
47
48    fn output_ordering(&self) -> Option<&[OrderOption]>;
49
50    fn metrics(&self) -> Option<RecordBatchMetrics>;
51}
52
53pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
54
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub struct OrderOption {
57    pub name: String,
58    pub options: SortOptions,
59}
60
61/// EmptyRecordBatchStream can be used to create a RecordBatchStream
62/// that will produce no results
63pub struct EmptyRecordBatchStream {
64    /// Schema wrapped by Arc
65    schema: SchemaRef,
66}
67
68impl EmptyRecordBatchStream {
69    /// Create an empty RecordBatchStream
70    pub fn new(schema: SchemaRef) -> Self {
71        Self { schema }
72    }
73}
74
75impl RecordBatchStream for EmptyRecordBatchStream {
76    fn schema(&self) -> SchemaRef {
77        self.schema.clone()
78    }
79
80    fn output_ordering(&self) -> Option<&[OrderOption]> {
81        None
82    }
83
84    fn metrics(&self) -> Option<RecordBatchMetrics> {
85        None
86    }
87}
88
89impl Stream for EmptyRecordBatchStream {
90    type Item = Result<RecordBatch>;
91
92    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
93        Poll::Ready(None)
94    }
95}
96
97#[derive(Debug, PartialEq)]
98pub struct RecordBatches {
99    schema: SchemaRef,
100    batches: Vec<RecordBatch>,
101}
102
103impl RecordBatches {
104    pub fn try_from_columns<I: IntoIterator<Item = VectorRef>>(
105        schema: SchemaRef,
106        columns: I,
107    ) -> Result<Self> {
108        let batches = vec![RecordBatch::new(schema.clone(), columns)?];
109        Ok(Self { schema, batches })
110    }
111
112    pub async fn try_collect(stream: SendableRecordBatchStream) -> Result<Self> {
113        let schema = stream.schema();
114        let batches = stream.try_collect::<Vec<_>>().await?;
115        Ok(Self { schema, batches })
116    }
117
118    #[inline]
119    pub fn empty() -> Self {
120        Self {
121            schema: Arc::new(Schema::new(vec![])),
122            batches: vec![],
123        }
124    }
125
126    pub fn iter(&self) -> impl Iterator<Item = &RecordBatch> {
127        self.batches.iter()
128    }
129
130    pub fn pretty_print(&self) -> Result<String> {
131        let df_batches = &self
132            .iter()
133            .map(|x| x.df_record_batch().clone())
134            .collect::<Vec<_>>();
135        let result = pretty::pretty_format_batches(df_batches).context(error::FormatSnafu)?;
136
137        Ok(result.to_string())
138    }
139
140    pub fn try_new(schema: SchemaRef, batches: Vec<RecordBatch>) -> Result<Self> {
141        for batch in &batches {
142            ensure!(
143                batch.schema == schema,
144                error::CreateRecordBatchesSnafu {
145                    reason: format!(
146                        "expect RecordBatch schema equals {:?}, actual: {:?}",
147                        schema, batch.schema
148                    )
149                }
150            )
151        }
152        Ok(Self { schema, batches })
153    }
154
155    pub fn schema(&self) -> SchemaRef {
156        self.schema.clone()
157    }
158
159    pub fn take(self) -> Vec<RecordBatch> {
160        self.batches
161    }
162
163    pub fn as_stream(&self) -> SendableRecordBatchStream {
164        Box::pin(SimpleRecordBatchStream {
165            inner: RecordBatches {
166                schema: self.schema(),
167                batches: self.batches.clone(),
168            },
169            index: 0,
170        })
171    }
172}
173
174impl IntoIterator for RecordBatches {
175    type Item = RecordBatch;
176    type IntoIter = std::vec::IntoIter<Self::Item>;
177
178    fn into_iter(self) -> Self::IntoIter {
179        self.batches.into_iter()
180    }
181}
182
183pub struct SimpleRecordBatchStream {
184    inner: RecordBatches,
185    index: usize,
186}
187
188impl RecordBatchStream for SimpleRecordBatchStream {
189    fn schema(&self) -> SchemaRef {
190        self.inner.schema()
191    }
192
193    fn output_ordering(&self) -> Option<&[OrderOption]> {
194        None
195    }
196
197    fn metrics(&self) -> Option<RecordBatchMetrics> {
198        None
199    }
200}
201
202impl Stream for SimpleRecordBatchStream {
203    type Item = Result<RecordBatch>;
204
205    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
206        Poll::Ready(if self.index < self.inner.batches.len() {
207            let batch = self.inner.batches[self.index].clone();
208            self.index += 1;
209            Some(Ok(batch))
210        } else {
211            None
212        })
213    }
214}
215
216/// Adapt a [Stream] of [RecordBatch] to a [RecordBatchStream].
217pub struct RecordBatchStreamWrapper<S> {
218    pub schema: SchemaRef,
219    pub stream: S,
220    pub output_ordering: Option<Vec<OrderOption>>,
221    pub metrics: Arc<ArcSwapOption<RecordBatchMetrics>>,
222}
223
224impl<S> RecordBatchStreamWrapper<S> {
225    /// Creates a [RecordBatchStreamWrapper] without output ordering requirement.
226    pub fn new(schema: SchemaRef, stream: S) -> RecordBatchStreamWrapper<S> {
227        RecordBatchStreamWrapper {
228            schema,
229            stream,
230            output_ordering: None,
231            metrics: Default::default(),
232        }
233    }
234}
235
236impl<S: Stream<Item = Result<RecordBatch>> + Unpin> RecordBatchStream
237    for RecordBatchStreamWrapper<S>
238{
239    fn name(&self) -> &str {
240        "RecordBatchStreamWrapper"
241    }
242
243    fn schema(&self) -> SchemaRef {
244        self.schema.clone()
245    }
246
247    fn output_ordering(&self) -> Option<&[OrderOption]> {
248        self.output_ordering.as_deref()
249    }
250
251    fn metrics(&self) -> Option<RecordBatchMetrics> {
252        self.metrics.load().as_ref().map(|s| s.as_ref().clone())
253    }
254}
255
256impl<S: Stream<Item = Result<RecordBatch>> + Unpin> Stream for RecordBatchStreamWrapper<S> {
257    type Item = Result<RecordBatch>;
258
259    fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
260        Pin::new(&mut self.stream).poll_next(ctx)
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use std::sync::Arc;
267
268    use datatypes::prelude::{ConcreteDataType, VectorRef};
269    use datatypes::schema::{ColumnSchema, Schema};
270    use datatypes::vectors::{BooleanVector, Int32Vector, StringVector};
271
272    use super::*;
273
274    #[test]
275    fn test_recordbatches_try_from_columns() {
276        let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
277            "a",
278            ConcreteDataType::int32_datatype(),
279            false,
280        )]));
281        let result = RecordBatches::try_from_columns(
282            schema.clone(),
283            vec![Arc::new(StringVector::from(vec!["hello", "world"])) as _],
284        );
285        assert!(result.is_err());
286
287        let v: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
288        let expected = vec![RecordBatch::new(schema.clone(), vec![v.clone()]).unwrap()];
289        let r = RecordBatches::try_from_columns(schema, vec![v]).unwrap();
290        assert_eq!(r.take(), expected);
291    }
292
293    #[test]
294    fn test_recordbatches_try_new() {
295        let column_a = ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false);
296        let column_b = ColumnSchema::new("b", ConcreteDataType::string_datatype(), false);
297        let column_c = ColumnSchema::new("c", ConcreteDataType::boolean_datatype(), false);
298
299        let va: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
300        let vb: VectorRef = Arc::new(StringVector::from(vec!["hello", "world"]));
301        let vc: VectorRef = Arc::new(BooleanVector::from(vec![true, false]));
302
303        let schema1 = Arc::new(Schema::new(vec![column_a.clone(), column_b]));
304        let batch1 = RecordBatch::new(schema1.clone(), vec![va.clone(), vb]).unwrap();
305
306        let schema2 = Arc::new(Schema::new(vec![column_a, column_c]));
307        let batch2 = RecordBatch::new(schema2.clone(), vec![va, vc]).unwrap();
308
309        let result = RecordBatches::try_new(schema1.clone(), vec![batch1.clone(), batch2]);
310        assert!(result.is_err());
311        assert_eq!(
312            result.unwrap_err().to_string(),
313            format!(
314                "Failed to create RecordBatches, reason: expect RecordBatch schema equals {schema1:?}, actual: {schema2:?}",
315            )
316        );
317
318        let batches = RecordBatches::try_new(schema1.clone(), vec![batch1.clone()]).unwrap();
319        let expected = "\
320+---+-------+
321| a | b     |
322+---+-------+
323| 1 | hello |
324| 2 | world |
325+---+-------+";
326        assert_eq!(batches.pretty_print().unwrap(), expected);
327
328        assert_eq!(schema1, batches.schema());
329        assert_eq!(vec![batch1], batches.take());
330    }
331
332    #[tokio::test]
333    async fn test_simple_recordbatch_stream() {
334        let column_a = ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false);
335        let column_b = ColumnSchema::new("b", ConcreteDataType::string_datatype(), false);
336        let schema = Arc::new(Schema::new(vec![column_a, column_b]));
337
338        let va1: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
339        let vb1: VectorRef = Arc::new(StringVector::from(vec!["a", "b"]));
340        let batch1 = RecordBatch::new(schema.clone(), vec![va1, vb1]).unwrap();
341
342        let va2: VectorRef = Arc::new(Int32Vector::from_slice([3, 4, 5]));
343        let vb2: VectorRef = Arc::new(StringVector::from(vec!["c", "d", "e"]));
344        let batch2 = RecordBatch::new(schema.clone(), vec![va2, vb2]).unwrap();
345
346        let recordbatches =
347            RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap();
348        let stream = recordbatches.as_stream();
349        let collected = util::collect(stream).await.unwrap();
350        assert_eq!(collected.len(), 2);
351        assert_eq!(collected[0], batch1);
352        assert_eq!(collected[1], batch2);
353    }
354}