common_recordbatch/
lib.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(never_type)]
16
17pub mod adapter;
18pub mod cursor;
19pub mod error;
20pub mod filter;
21mod recordbatch;
22pub mod util;
23
24use std::pin::Pin;
25use std::sync::Arc;
26
27use adapter::RecordBatchMetrics;
28use arc_swap::ArcSwapOption;
29pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
30use datatypes::arrow::compute::SortOptions;
31pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch;
32use datatypes::arrow::util::pretty;
33use datatypes::prelude::{ConcreteDataType, VectorRef};
34use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
35use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
36use datatypes::types::{JsonFormat, jsonb_to_string};
37use datatypes::vectors::{BinaryVector, StringVectorBuilder};
38use error::Result;
39use futures::task::{Context, Poll};
40use futures::{Stream, TryStreamExt};
41pub use recordbatch::RecordBatch;
42use snafu::{OptionExt, ResultExt, ensure};
43
44pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
45    fn name(&self) -> &str {
46        "RecordBatchStream"
47    }
48
49    fn schema(&self) -> SchemaRef;
50
51    fn output_ordering(&self) -> Option<&[OrderOption]>;
52
53    fn metrics(&self) -> Option<RecordBatchMetrics>;
54}
55
56pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
57
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct OrderOption {
60    pub name: String,
61    pub options: SortOptions,
62}
63
64/// A wrapper that maps a [RecordBatchStream] to a new [RecordBatchStream] by applying a function to each [RecordBatch].
65///
66/// The mapper function is applied to each [RecordBatch] in the stream.
67/// The schema of the new [RecordBatchStream] is the same as the schema of the inner [RecordBatchStream] after applying the schema mapper function.
68/// The output ordering of the new [RecordBatchStream] is the same as the output ordering of the inner [RecordBatchStream].
69/// The metrics of the new [RecordBatchStream] is the same as the metrics of the inner [RecordBatchStream] if it is not `None`.
70pub struct SendableRecordBatchMapper {
71    inner: SendableRecordBatchStream,
72    /// The mapper function is applied to each [RecordBatch] in the stream.
73    /// The original schema and the mapped schema are passed to the mapper function.
74    mapper: fn(RecordBatch, &SchemaRef, &SchemaRef) -> Result<RecordBatch>,
75    /// The schema of the new [RecordBatchStream] is the same as the schema of the inner [RecordBatchStream] after applying the schema mapper function.
76    schema: SchemaRef,
77    /// Whether the mapper function is applied to each [RecordBatch] in the stream.
78    apply_mapper: bool,
79}
80
81/// Maps the json type to string in the batch.
82///
83/// The json type is mapped to string by converting the json value to string.
84/// The batch is updated to have the same number of columns as the original batch,
85/// but with the json type mapped to string.
86pub fn map_json_type_to_string(
87    batch: RecordBatch,
88    original_schema: &SchemaRef,
89    mapped_schema: &SchemaRef,
90) -> Result<RecordBatch> {
91    let mut vectors = Vec::with_capacity(original_schema.column_schemas().len());
92    for (vector, schema) in batch.columns.iter().zip(original_schema.column_schemas()) {
93        if let ConcreteDataType::Json(j) = &schema.data_type {
94            if matches!(&j.format, JsonFormat::Jsonb) {
95                let mut string_vector_builder = StringVectorBuilder::with_capacity(vector.len());
96                let binary_vector = vector
97                    .as_any()
98                    .downcast_ref::<BinaryVector>()
99                    .with_context(|| error::DowncastVectorSnafu {
100                        from_type: schema.data_type.clone(),
101                        to_type: ConcreteDataType::binary_datatype(),
102                    })?;
103                for value in binary_vector.iter_data() {
104                    let Some(value) = value else {
105                        string_vector_builder.push(None);
106                        continue;
107                    };
108                    let string_value =
109                        jsonb_to_string(value).with_context(|_| error::CastVectorSnafu {
110                            from_type: schema.data_type.clone(),
111                            to_type: ConcreteDataType::string_datatype(),
112                        })?;
113                    string_vector_builder.push(Some(string_value.as_str()));
114                }
115
116                let string_vector = string_vector_builder.finish();
117                vectors.push(Arc::new(string_vector) as VectorRef);
118            } else {
119                vectors.push(vector.clone());
120            }
121        } else {
122            vectors.push(vector.clone());
123        }
124    }
125
126    RecordBatch::new(mapped_schema.clone(), vectors)
127}
128
129/// Maps the json type to string in the schema.
130///
131/// The json type is mapped to string by converting the json value to string.
132/// The schema is updated to have the same number of columns as the original schema,
133/// but with the json type mapped to string.
134///
135/// Returns the new schema and whether the schema needs to be mapped to string.
136pub fn map_json_type_to_string_schema(schema: SchemaRef) -> (SchemaRef, bool) {
137    let mut new_columns = Vec::with_capacity(schema.column_schemas().len());
138    let mut apply_mapper = false;
139    for column in schema.column_schemas() {
140        if matches!(column.data_type, ConcreteDataType::Json(_)) {
141            new_columns.push(ColumnSchema::new(
142                column.name.clone(),
143                ConcreteDataType::string_datatype(),
144                column.is_nullable(),
145            ));
146            apply_mapper = true;
147        } else {
148            new_columns.push(column.clone());
149        }
150    }
151    (Arc::new(Schema::new(new_columns)), apply_mapper)
152}
153
154impl SendableRecordBatchMapper {
155    /// Creates a new [SendableRecordBatchMapper] with the given inner [RecordBatchStream], mapper function, and schema mapper function.
156    pub fn new(
157        inner: SendableRecordBatchStream,
158        mapper: fn(RecordBatch, &SchemaRef, &SchemaRef) -> Result<RecordBatch>,
159        schema_mapper: fn(SchemaRef) -> (SchemaRef, bool),
160    ) -> Self {
161        let (mapped_schema, apply_mapper) = schema_mapper(inner.schema());
162        Self {
163            inner,
164            mapper,
165            schema: mapped_schema,
166            apply_mapper,
167        }
168    }
169}
170
171impl RecordBatchStream for SendableRecordBatchMapper {
172    fn name(&self) -> &str {
173        "SendableRecordBatchMapper"
174    }
175
176    fn schema(&self) -> SchemaRef {
177        self.schema.clone()
178    }
179
180    fn output_ordering(&self) -> Option<&[OrderOption]> {
181        self.inner.output_ordering()
182    }
183
184    fn metrics(&self) -> Option<RecordBatchMetrics> {
185        self.inner.metrics()
186    }
187}
188
189impl Stream for SendableRecordBatchMapper {
190    type Item = Result<RecordBatch>;
191
192    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
193        if self.apply_mapper {
194            Pin::new(&mut self.inner).poll_next(cx).map(|opt| {
195                opt.map(|result| {
196                    result
197                        .and_then(|batch| (self.mapper)(batch, &self.inner.schema(), &self.schema))
198                })
199            })
200        } else {
201            Pin::new(&mut self.inner).poll_next(cx)
202        }
203    }
204}
205
206/// EmptyRecordBatchStream can be used to create a RecordBatchStream
207/// that will produce no results
208pub struct EmptyRecordBatchStream {
209    /// Schema wrapped by Arc
210    schema: SchemaRef,
211}
212
213impl EmptyRecordBatchStream {
214    /// Create an empty RecordBatchStream
215    pub fn new(schema: SchemaRef) -> Self {
216        Self { schema }
217    }
218}
219
220impl RecordBatchStream for EmptyRecordBatchStream {
221    fn schema(&self) -> SchemaRef {
222        self.schema.clone()
223    }
224
225    fn output_ordering(&self) -> Option<&[OrderOption]> {
226        None
227    }
228
229    fn metrics(&self) -> Option<RecordBatchMetrics> {
230        None
231    }
232}
233
234impl Stream for EmptyRecordBatchStream {
235    type Item = Result<RecordBatch>;
236
237    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
238        Poll::Ready(None)
239    }
240}
241
242#[derive(Debug, PartialEq)]
243pub struct RecordBatches {
244    schema: SchemaRef,
245    batches: Vec<RecordBatch>,
246}
247
248impl RecordBatches {
249    pub fn try_from_columns<I: IntoIterator<Item = VectorRef>>(
250        schema: SchemaRef,
251        columns: I,
252    ) -> Result<Self> {
253        let batches = vec![RecordBatch::new(schema.clone(), columns)?];
254        Ok(Self { schema, batches })
255    }
256
257    pub async fn try_collect(stream: SendableRecordBatchStream) -> Result<Self> {
258        let schema = stream.schema();
259        let batches = stream.try_collect::<Vec<_>>().await?;
260        Ok(Self { schema, batches })
261    }
262
263    #[inline]
264    pub fn empty() -> Self {
265        Self {
266            schema: Arc::new(Schema::new(vec![])),
267            batches: vec![],
268        }
269    }
270
271    pub fn iter(&self) -> impl Iterator<Item = &RecordBatch> {
272        self.batches.iter()
273    }
274
275    pub fn pretty_print(&self) -> Result<String> {
276        let df_batches = &self
277            .iter()
278            .map(|x| x.df_record_batch().clone())
279            .collect::<Vec<_>>();
280        let result = pretty::pretty_format_batches(df_batches).context(error::FormatSnafu)?;
281
282        Ok(result.to_string())
283    }
284
285    pub fn try_new(schema: SchemaRef, batches: Vec<RecordBatch>) -> Result<Self> {
286        for batch in &batches {
287            ensure!(
288                batch.schema == schema,
289                error::CreateRecordBatchesSnafu {
290                    reason: format!(
291                        "expect RecordBatch schema equals {:?}, actual: {:?}",
292                        schema, batch.schema
293                    )
294                }
295            )
296        }
297        Ok(Self { schema, batches })
298    }
299
300    pub fn schema(&self) -> SchemaRef {
301        self.schema.clone()
302    }
303
304    pub fn take(self) -> Vec<RecordBatch> {
305        self.batches
306    }
307
308    pub fn as_stream(&self) -> SendableRecordBatchStream {
309        Box::pin(SimpleRecordBatchStream {
310            inner: RecordBatches {
311                schema: self.schema(),
312                batches: self.batches.clone(),
313            },
314            index: 0,
315        })
316    }
317}
318
319impl IntoIterator for RecordBatches {
320    type Item = RecordBatch;
321    type IntoIter = std::vec::IntoIter<Self::Item>;
322
323    fn into_iter(self) -> Self::IntoIter {
324        self.batches.into_iter()
325    }
326}
327
328pub struct SimpleRecordBatchStream {
329    inner: RecordBatches,
330    index: usize,
331}
332
333impl RecordBatchStream for SimpleRecordBatchStream {
334    fn schema(&self) -> SchemaRef {
335        self.inner.schema()
336    }
337
338    fn output_ordering(&self) -> Option<&[OrderOption]> {
339        None
340    }
341
342    fn metrics(&self) -> Option<RecordBatchMetrics> {
343        None
344    }
345}
346
347impl Stream for SimpleRecordBatchStream {
348    type Item = Result<RecordBatch>;
349
350    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
351        Poll::Ready(if self.index < self.inner.batches.len() {
352            let batch = self.inner.batches[self.index].clone();
353            self.index += 1;
354            Some(Ok(batch))
355        } else {
356            None
357        })
358    }
359}
360
361/// Adapt a [Stream] of [RecordBatch] to a [RecordBatchStream].
362pub struct RecordBatchStreamWrapper<S> {
363    pub schema: SchemaRef,
364    pub stream: S,
365    pub output_ordering: Option<Vec<OrderOption>>,
366    pub metrics: Arc<ArcSwapOption<RecordBatchMetrics>>,
367}
368
369impl<S> RecordBatchStreamWrapper<S> {
370    /// Creates a [RecordBatchStreamWrapper] without output ordering requirement.
371    pub fn new(schema: SchemaRef, stream: S) -> RecordBatchStreamWrapper<S> {
372        RecordBatchStreamWrapper {
373            schema,
374            stream,
375            output_ordering: None,
376            metrics: Default::default(),
377        }
378    }
379}
380
381impl<S: Stream<Item = Result<RecordBatch>> + Unpin> RecordBatchStream
382    for RecordBatchStreamWrapper<S>
383{
384    fn name(&self) -> &str {
385        "RecordBatchStreamWrapper"
386    }
387
388    fn schema(&self) -> SchemaRef {
389        self.schema.clone()
390    }
391
392    fn output_ordering(&self) -> Option<&[OrderOption]> {
393        self.output_ordering.as_deref()
394    }
395
396    fn metrics(&self) -> Option<RecordBatchMetrics> {
397        self.metrics.load().as_ref().map(|s| s.as_ref().clone())
398    }
399}
400
401impl<S: Stream<Item = Result<RecordBatch>> + Unpin> Stream for RecordBatchStreamWrapper<S> {
402    type Item = Result<RecordBatch>;
403
404    fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
405        Pin::new(&mut self.stream).poll_next(ctx)
406    }
407}
408
409#[cfg(test)]
410mod tests {
411    use std::sync::Arc;
412
413    use datatypes::prelude::{ConcreteDataType, VectorRef};
414    use datatypes::schema::{ColumnSchema, Schema};
415    use datatypes::vectors::{BooleanVector, Int32Vector, StringVector};
416
417    use super::*;
418
419    #[test]
420    fn test_recordbatches_try_from_columns() {
421        let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
422            "a",
423            ConcreteDataType::int32_datatype(),
424            false,
425        )]));
426        let result = RecordBatches::try_from_columns(
427            schema.clone(),
428            vec![Arc::new(StringVector::from(vec!["hello", "world"])) as _],
429        );
430        assert!(result.is_err());
431
432        let v: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
433        let expected = vec![RecordBatch::new(schema.clone(), vec![v.clone()]).unwrap()];
434        let r = RecordBatches::try_from_columns(schema, vec![v]).unwrap();
435        assert_eq!(r.take(), expected);
436    }
437
438    #[test]
439    fn test_recordbatches_try_new() {
440        let column_a = ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false);
441        let column_b = ColumnSchema::new("b", ConcreteDataType::string_datatype(), false);
442        let column_c = ColumnSchema::new("c", ConcreteDataType::boolean_datatype(), false);
443
444        let va: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
445        let vb: VectorRef = Arc::new(StringVector::from(vec!["hello", "world"]));
446        let vc: VectorRef = Arc::new(BooleanVector::from(vec![true, false]));
447
448        let schema1 = Arc::new(Schema::new(vec![column_a.clone(), column_b]));
449        let batch1 = RecordBatch::new(schema1.clone(), vec![va.clone(), vb]).unwrap();
450
451        let schema2 = Arc::new(Schema::new(vec![column_a, column_c]));
452        let batch2 = RecordBatch::new(schema2.clone(), vec![va, vc]).unwrap();
453
454        let result = RecordBatches::try_new(schema1.clone(), vec![batch1.clone(), batch2]);
455        assert!(result.is_err());
456        assert_eq!(
457            result.unwrap_err().to_string(),
458            format!(
459                "Failed to create RecordBatches, reason: expect RecordBatch schema equals {schema1:?}, actual: {schema2:?}",
460            )
461        );
462
463        let batches = RecordBatches::try_new(schema1.clone(), vec![batch1.clone()]).unwrap();
464        let expected = "\
465+---+-------+
466| a | b     |
467+---+-------+
468| 1 | hello |
469| 2 | world |
470+---+-------+";
471        assert_eq!(batches.pretty_print().unwrap(), expected);
472
473        assert_eq!(schema1, batches.schema());
474        assert_eq!(vec![batch1], batches.take());
475    }
476
477    #[tokio::test]
478    async fn test_simple_recordbatch_stream() {
479        let column_a = ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false);
480        let column_b = ColumnSchema::new("b", ConcreteDataType::string_datatype(), false);
481        let schema = Arc::new(Schema::new(vec![column_a, column_b]));
482
483        let va1: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
484        let vb1: VectorRef = Arc::new(StringVector::from(vec!["a", "b"]));
485        let batch1 = RecordBatch::new(schema.clone(), vec![va1, vb1]).unwrap();
486
487        let va2: VectorRef = Arc::new(Int32Vector::from_slice([3, 4, 5]));
488        let vb2: VectorRef = Arc::new(StringVector::from(vec!["c", "d", "e"]));
489        let batch2 = RecordBatch::new(schema.clone(), vec![va2, vb2]).unwrap();
490
491        let recordbatches =
492            RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap();
493        let stream = recordbatches.as_stream();
494        let collected = util::collect(stream).await.unwrap();
495        assert_eq!(collected.len(), 2);
496        assert_eq!(collected[0], batch1);
497        assert_eq!(collected[1], batch2);
498    }
499}