common_recordbatch/
lib.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(never_type)]
16
17pub mod adapter;
18pub mod cursor;
19pub mod error;
20pub mod filter;
21mod recordbatch;
22pub mod util;
23
24use std::pin::Pin;
25use std::sync::Arc;
26
27use adapter::RecordBatchMetrics;
28use arc_swap::ArcSwapOption;
29pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
30use datatypes::arrow::compute::SortOptions;
31pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch;
32use datatypes::arrow::util::pretty;
33use datatypes::prelude::{ConcreteDataType, VectorRef};
34use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
35use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
36use datatypes::types::json_type_value_to_string;
37use datatypes::vectors::{BinaryVector, StringVectorBuilder};
38use error::Result;
39use futures::task::{Context, Poll};
40use futures::{Stream, TryStreamExt};
41pub use recordbatch::RecordBatch;
42use snafu::{ensure, OptionExt, ResultExt};
43
44pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
45    fn name(&self) -> &str {
46        "RecordBatchStream"
47    }
48
49    fn schema(&self) -> SchemaRef;
50
51    fn output_ordering(&self) -> Option<&[OrderOption]>;
52
53    fn metrics(&self) -> Option<RecordBatchMetrics>;
54}
55
56pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
57
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct OrderOption {
60    pub name: String,
61    pub options: SortOptions,
62}
63
64/// A wrapper that maps a [RecordBatchStream] to a new [RecordBatchStream] by applying a function to each [RecordBatch].
65///
66/// The mapper function is applied to each [RecordBatch] in the stream.
67/// The schema of the new [RecordBatchStream] is the same as the schema of the inner [RecordBatchStream] after applying the schema mapper function.
68/// The output ordering of the new [RecordBatchStream] is the same as the output ordering of the inner [RecordBatchStream].
69/// The metrics of the new [RecordBatchStream] is the same as the metrics of the inner [RecordBatchStream] if it is not `None`.
70pub struct SendableRecordBatchMapper {
71    inner: SendableRecordBatchStream,
72    /// The mapper function is applied to each [RecordBatch] in the stream.
73    /// The original schema and the mapped schema are passed to the mapper function.
74    mapper: fn(RecordBatch, &SchemaRef, &SchemaRef) -> Result<RecordBatch>,
75    /// The schema of the new [RecordBatchStream] is the same as the schema of the inner [RecordBatchStream] after applying the schema mapper function.
76    schema: SchemaRef,
77    /// Whether the mapper function is applied to each [RecordBatch] in the stream.
78    apply_mapper: bool,
79}
80
81/// Maps the json type to string in the batch.
82///
83/// The json type is mapped to string by converting the json value to string.
84/// The batch is updated to have the same number of columns as the original batch,
85/// but with the json type mapped to string.
86pub fn map_json_type_to_string(
87    batch: RecordBatch,
88    original_schema: &SchemaRef,
89    mapped_schema: &SchemaRef,
90) -> Result<RecordBatch> {
91    let mut vectors = Vec::with_capacity(original_schema.column_schemas().len());
92    for (vector, schema) in batch.columns.iter().zip(original_schema.column_schemas()) {
93        if let ConcreteDataType::Json(j) = schema.data_type {
94            let mut string_vector_builder = StringVectorBuilder::with_capacity(vector.len());
95            let binary_vector = vector
96                .as_any()
97                .downcast_ref::<BinaryVector>()
98                .with_context(|| error::DowncastVectorSnafu {
99                    from_type: schema.data_type.clone(),
100                    to_type: ConcreteDataType::binary_datatype(),
101                })?;
102            for value in binary_vector.iter_data() {
103                let Some(value) = value else {
104                    string_vector_builder.push(None);
105                    continue;
106                };
107                let string_value =
108                    json_type_value_to_string(value, &j.format).with_context(|_| {
109                        error::CastVectorSnafu {
110                            from_type: schema.data_type.clone(),
111                            to_type: ConcreteDataType::string_datatype(),
112                        }
113                    })?;
114                string_vector_builder.push(Some(string_value.as_str()));
115            }
116
117            let string_vector = string_vector_builder.finish();
118            vectors.push(Arc::new(string_vector) as VectorRef);
119        } else {
120            vectors.push(vector.clone());
121        }
122    }
123
124    RecordBatch::new(mapped_schema.clone(), vectors)
125}
126
127/// Maps the json type to string in the schema.
128///
129/// The json type is mapped to string by converting the json value to string.
130/// The schema is updated to have the same number of columns as the original schema,
131/// but with the json type mapped to string.
132///
133/// Returns the new schema and whether the schema needs to be mapped to string.
134pub fn map_json_type_to_string_schema(schema: SchemaRef) -> (SchemaRef, bool) {
135    let mut new_columns = Vec::with_capacity(schema.column_schemas().len());
136    let mut apply_mapper = false;
137    for column in schema.column_schemas() {
138        if matches!(column.data_type, ConcreteDataType::Json(_)) {
139            new_columns.push(ColumnSchema::new(
140                column.name.to_string(),
141                ConcreteDataType::string_datatype(),
142                column.is_nullable(),
143            ));
144            apply_mapper = true;
145        } else {
146            new_columns.push(column.clone());
147        }
148    }
149    (Arc::new(Schema::new(new_columns)), apply_mapper)
150}
151
152impl SendableRecordBatchMapper {
153    /// Creates a new [SendableRecordBatchMapper] with the given inner [RecordBatchStream], mapper function, and schema mapper function.
154    pub fn new(
155        inner: SendableRecordBatchStream,
156        mapper: fn(RecordBatch, &SchemaRef, &SchemaRef) -> Result<RecordBatch>,
157        schema_mapper: fn(SchemaRef) -> (SchemaRef, bool),
158    ) -> Self {
159        let (mapped_schema, apply_mapper) = schema_mapper(inner.schema());
160        Self {
161            inner,
162            mapper,
163            schema: mapped_schema,
164            apply_mapper,
165        }
166    }
167}
168
169impl RecordBatchStream for SendableRecordBatchMapper {
170    fn name(&self) -> &str {
171        "SendableRecordBatchMapper"
172    }
173
174    fn schema(&self) -> SchemaRef {
175        self.schema.clone()
176    }
177
178    fn output_ordering(&self) -> Option<&[OrderOption]> {
179        self.inner.output_ordering()
180    }
181
182    fn metrics(&self) -> Option<RecordBatchMetrics> {
183        self.inner.metrics()
184    }
185}
186
187impl Stream for SendableRecordBatchMapper {
188    type Item = Result<RecordBatch>;
189
190    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
191        if self.apply_mapper {
192            Pin::new(&mut self.inner).poll_next(cx).map(|opt| {
193                opt.map(|result| {
194                    result
195                        .and_then(|batch| (self.mapper)(batch, &self.inner.schema(), &self.schema))
196                })
197            })
198        } else {
199            Pin::new(&mut self.inner).poll_next(cx)
200        }
201    }
202}
203
204/// EmptyRecordBatchStream can be used to create a RecordBatchStream
205/// that will produce no results
206pub struct EmptyRecordBatchStream {
207    /// Schema wrapped by Arc
208    schema: SchemaRef,
209}
210
211impl EmptyRecordBatchStream {
212    /// Create an empty RecordBatchStream
213    pub fn new(schema: SchemaRef) -> Self {
214        Self { schema }
215    }
216}
217
218impl RecordBatchStream for EmptyRecordBatchStream {
219    fn schema(&self) -> SchemaRef {
220        self.schema.clone()
221    }
222
223    fn output_ordering(&self) -> Option<&[OrderOption]> {
224        None
225    }
226
227    fn metrics(&self) -> Option<RecordBatchMetrics> {
228        None
229    }
230}
231
232impl Stream for EmptyRecordBatchStream {
233    type Item = Result<RecordBatch>;
234
235    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
236        Poll::Ready(None)
237    }
238}
239
240#[derive(Debug, PartialEq)]
241pub struct RecordBatches {
242    schema: SchemaRef,
243    batches: Vec<RecordBatch>,
244}
245
246impl RecordBatches {
247    pub fn try_from_columns<I: IntoIterator<Item = VectorRef>>(
248        schema: SchemaRef,
249        columns: I,
250    ) -> Result<Self> {
251        let batches = vec![RecordBatch::new(schema.clone(), columns)?];
252        Ok(Self { schema, batches })
253    }
254
255    pub async fn try_collect(stream: SendableRecordBatchStream) -> Result<Self> {
256        let schema = stream.schema();
257        let batches = stream.try_collect::<Vec<_>>().await?;
258        Ok(Self { schema, batches })
259    }
260
261    #[inline]
262    pub fn empty() -> Self {
263        Self {
264            schema: Arc::new(Schema::new(vec![])),
265            batches: vec![],
266        }
267    }
268
269    pub fn iter(&self) -> impl Iterator<Item = &RecordBatch> {
270        self.batches.iter()
271    }
272
273    pub fn pretty_print(&self) -> Result<String> {
274        let df_batches = &self
275            .iter()
276            .map(|x| x.df_record_batch().clone())
277            .collect::<Vec<_>>();
278        let result = pretty::pretty_format_batches(df_batches).context(error::FormatSnafu)?;
279
280        Ok(result.to_string())
281    }
282
283    pub fn try_new(schema: SchemaRef, batches: Vec<RecordBatch>) -> Result<Self> {
284        for batch in &batches {
285            ensure!(
286                batch.schema == schema,
287                error::CreateRecordBatchesSnafu {
288                    reason: format!(
289                        "expect RecordBatch schema equals {:?}, actual: {:?}",
290                        schema, batch.schema
291                    )
292                }
293            )
294        }
295        Ok(Self { schema, batches })
296    }
297
298    pub fn schema(&self) -> SchemaRef {
299        self.schema.clone()
300    }
301
302    pub fn take(self) -> Vec<RecordBatch> {
303        self.batches
304    }
305
306    pub fn as_stream(&self) -> SendableRecordBatchStream {
307        Box::pin(SimpleRecordBatchStream {
308            inner: RecordBatches {
309                schema: self.schema(),
310                batches: self.batches.clone(),
311            },
312            index: 0,
313        })
314    }
315}
316
317impl IntoIterator for RecordBatches {
318    type Item = RecordBatch;
319    type IntoIter = std::vec::IntoIter<Self::Item>;
320
321    fn into_iter(self) -> Self::IntoIter {
322        self.batches.into_iter()
323    }
324}
325
326pub struct SimpleRecordBatchStream {
327    inner: RecordBatches,
328    index: usize,
329}
330
331impl RecordBatchStream for SimpleRecordBatchStream {
332    fn schema(&self) -> SchemaRef {
333        self.inner.schema()
334    }
335
336    fn output_ordering(&self) -> Option<&[OrderOption]> {
337        None
338    }
339
340    fn metrics(&self) -> Option<RecordBatchMetrics> {
341        None
342    }
343}
344
345impl Stream for SimpleRecordBatchStream {
346    type Item = Result<RecordBatch>;
347
348    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
349        Poll::Ready(if self.index < self.inner.batches.len() {
350            let batch = self.inner.batches[self.index].clone();
351            self.index += 1;
352            Some(Ok(batch))
353        } else {
354            None
355        })
356    }
357}
358
359/// Adapt a [Stream] of [RecordBatch] to a [RecordBatchStream].
360pub struct RecordBatchStreamWrapper<S> {
361    pub schema: SchemaRef,
362    pub stream: S,
363    pub output_ordering: Option<Vec<OrderOption>>,
364    pub metrics: Arc<ArcSwapOption<RecordBatchMetrics>>,
365}
366
367impl<S> RecordBatchStreamWrapper<S> {
368    /// Creates a [RecordBatchStreamWrapper] without output ordering requirement.
369    pub fn new(schema: SchemaRef, stream: S) -> RecordBatchStreamWrapper<S> {
370        RecordBatchStreamWrapper {
371            schema,
372            stream,
373            output_ordering: None,
374            metrics: Default::default(),
375        }
376    }
377}
378
379impl<S: Stream<Item = Result<RecordBatch>> + Unpin> RecordBatchStream
380    for RecordBatchStreamWrapper<S>
381{
382    fn name(&self) -> &str {
383        "RecordBatchStreamWrapper"
384    }
385
386    fn schema(&self) -> SchemaRef {
387        self.schema.clone()
388    }
389
390    fn output_ordering(&self) -> Option<&[OrderOption]> {
391        self.output_ordering.as_deref()
392    }
393
394    fn metrics(&self) -> Option<RecordBatchMetrics> {
395        self.metrics.load().as_ref().map(|s| s.as_ref().clone())
396    }
397}
398
399impl<S: Stream<Item = Result<RecordBatch>> + Unpin> Stream for RecordBatchStreamWrapper<S> {
400    type Item = Result<RecordBatch>;
401
402    fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
403        Pin::new(&mut self.stream).poll_next(ctx)
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use std::sync::Arc;
410
411    use datatypes::prelude::{ConcreteDataType, VectorRef};
412    use datatypes::schema::{ColumnSchema, Schema};
413    use datatypes::vectors::{BooleanVector, Int32Vector, StringVector};
414
415    use super::*;
416
417    #[test]
418    fn test_recordbatches_try_from_columns() {
419        let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
420            "a",
421            ConcreteDataType::int32_datatype(),
422            false,
423        )]));
424        let result = RecordBatches::try_from_columns(
425            schema.clone(),
426            vec![Arc::new(StringVector::from(vec!["hello", "world"])) as _],
427        );
428        assert!(result.is_err());
429
430        let v: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
431        let expected = vec![RecordBatch::new(schema.clone(), vec![v.clone()]).unwrap()];
432        let r = RecordBatches::try_from_columns(schema, vec![v]).unwrap();
433        assert_eq!(r.take(), expected);
434    }
435
436    #[test]
437    fn test_recordbatches_try_new() {
438        let column_a = ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false);
439        let column_b = ColumnSchema::new("b", ConcreteDataType::string_datatype(), false);
440        let column_c = ColumnSchema::new("c", ConcreteDataType::boolean_datatype(), false);
441
442        let va: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
443        let vb: VectorRef = Arc::new(StringVector::from(vec!["hello", "world"]));
444        let vc: VectorRef = Arc::new(BooleanVector::from(vec![true, false]));
445
446        let schema1 = Arc::new(Schema::new(vec![column_a.clone(), column_b]));
447        let batch1 = RecordBatch::new(schema1.clone(), vec![va.clone(), vb]).unwrap();
448
449        let schema2 = Arc::new(Schema::new(vec![column_a, column_c]));
450        let batch2 = RecordBatch::new(schema2.clone(), vec![va, vc]).unwrap();
451
452        let result = RecordBatches::try_new(schema1.clone(), vec![batch1.clone(), batch2]);
453        assert!(result.is_err());
454        assert_eq!(
455            result.unwrap_err().to_string(),
456            format!(
457                "Failed to create RecordBatches, reason: expect RecordBatch schema equals {schema1:?}, actual: {schema2:?}",
458            )
459        );
460
461        let batches = RecordBatches::try_new(schema1.clone(), vec![batch1.clone()]).unwrap();
462        let expected = "\
463+---+-------+
464| a | b     |
465+---+-------+
466| 1 | hello |
467| 2 | world |
468+---+-------+";
469        assert_eq!(batches.pretty_print().unwrap(), expected);
470
471        assert_eq!(schema1, batches.schema());
472        assert_eq!(vec![batch1], batches.take());
473    }
474
475    #[tokio::test]
476    async fn test_simple_recordbatch_stream() {
477        let column_a = ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false);
478        let column_b = ColumnSchema::new("b", ConcreteDataType::string_datatype(), false);
479        let schema = Arc::new(Schema::new(vec![column_a, column_b]));
480
481        let va1: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
482        let vb1: VectorRef = Arc::new(StringVector::from(vec!["a", "b"]));
483        let batch1 = RecordBatch::new(schema.clone(), vec![va1, vb1]).unwrap();
484
485        let va2: VectorRef = Arc::new(Int32Vector::from_slice([3, 4, 5]));
486        let vb2: VectorRef = Arc::new(StringVector::from(vec!["c", "d", "e"]));
487        let batch2 = RecordBatch::new(schema.clone(), vec![va2, vb2]).unwrap();
488
489        let recordbatches =
490            RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap();
491        let stream = recordbatches.as_stream();
492        let collected = util::collect(stream).await.unwrap();
493        assert_eq!(collected.len(), 2);
494        assert_eq!(collected[0], batch1);
495        assert_eq!(collected[1], batch2);
496    }
497}