1#![feature(never_type)]
16
17pub mod adapter;
18pub mod cursor;
19pub mod error;
20pub mod filter;
21mod recordbatch;
22pub mod util;
23
24use std::pin::Pin;
25use std::sync::Arc;
26
27use adapter::RecordBatchMetrics;
28use arc_swap::ArcSwapOption;
29pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
30use datatypes::arrow::compute::SortOptions;
31pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch;
32use datatypes::arrow::util::pretty;
33use datatypes::prelude::{ConcreteDataType, VectorRef};
34use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
35use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
36use datatypes::types::{JsonFormat, jsonb_to_string};
37use datatypes::vectors::{BinaryVector, StringVectorBuilder};
38use error::Result;
39use futures::task::{Context, Poll};
40use futures::{Stream, TryStreamExt};
41pub use recordbatch::RecordBatch;
42use snafu::{OptionExt, ResultExt, ensure};
43
44pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
45 fn name(&self) -> &str {
46 "RecordBatchStream"
47 }
48
49 fn schema(&self) -> SchemaRef;
50
51 fn output_ordering(&self) -> Option<&[OrderOption]>;
52
53 fn metrics(&self) -> Option<RecordBatchMetrics>;
54}
55
56pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
57
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct OrderOption {
60 pub name: String,
61 pub options: SortOptions,
62}
63
64pub struct SendableRecordBatchMapper {
71 inner: SendableRecordBatchStream,
72 mapper: fn(RecordBatch, &SchemaRef, &SchemaRef) -> Result<RecordBatch>,
75 schema: SchemaRef,
77 apply_mapper: bool,
79}
80
81pub fn map_json_type_to_string(
87 batch: RecordBatch,
88 original_schema: &SchemaRef,
89 mapped_schema: &SchemaRef,
90) -> Result<RecordBatch> {
91 let mut vectors = Vec::with_capacity(original_schema.column_schemas().len());
92 for (vector, schema) in batch.columns.iter().zip(original_schema.column_schemas()) {
93 if let ConcreteDataType::Json(j) = &schema.data_type {
94 if matches!(&j.format, JsonFormat::Jsonb) {
95 let mut string_vector_builder = StringVectorBuilder::with_capacity(vector.len());
96 let binary_vector = vector
97 .as_any()
98 .downcast_ref::<BinaryVector>()
99 .with_context(|| error::DowncastVectorSnafu {
100 from_type: schema.data_type.clone(),
101 to_type: ConcreteDataType::binary_datatype(),
102 })?;
103 for value in binary_vector.iter_data() {
104 let Some(value) = value else {
105 string_vector_builder.push(None);
106 continue;
107 };
108 let string_value =
109 jsonb_to_string(value).with_context(|_| error::CastVectorSnafu {
110 from_type: schema.data_type.clone(),
111 to_type: ConcreteDataType::string_datatype(),
112 })?;
113 string_vector_builder.push(Some(string_value.as_str()));
114 }
115
116 let string_vector = string_vector_builder.finish();
117 vectors.push(Arc::new(string_vector) as VectorRef);
118 } else {
119 vectors.push(vector.clone());
120 }
121 } else {
122 vectors.push(vector.clone());
123 }
124 }
125
126 RecordBatch::new(mapped_schema.clone(), vectors)
127}
128
129pub fn map_json_type_to_string_schema(schema: SchemaRef) -> (SchemaRef, bool) {
137 let mut new_columns = Vec::with_capacity(schema.column_schemas().len());
138 let mut apply_mapper = false;
139 for column in schema.column_schemas() {
140 if matches!(column.data_type, ConcreteDataType::Json(_)) {
141 new_columns.push(ColumnSchema::new(
142 column.name.clone(),
143 ConcreteDataType::string_datatype(),
144 column.is_nullable(),
145 ));
146 apply_mapper = true;
147 } else {
148 new_columns.push(column.clone());
149 }
150 }
151 (Arc::new(Schema::new(new_columns)), apply_mapper)
152}
153
154impl SendableRecordBatchMapper {
155 pub fn new(
157 inner: SendableRecordBatchStream,
158 mapper: fn(RecordBatch, &SchemaRef, &SchemaRef) -> Result<RecordBatch>,
159 schema_mapper: fn(SchemaRef) -> (SchemaRef, bool),
160 ) -> Self {
161 let (mapped_schema, apply_mapper) = schema_mapper(inner.schema());
162 Self {
163 inner,
164 mapper,
165 schema: mapped_schema,
166 apply_mapper,
167 }
168 }
169}
170
171impl RecordBatchStream for SendableRecordBatchMapper {
172 fn name(&self) -> &str {
173 "SendableRecordBatchMapper"
174 }
175
176 fn schema(&self) -> SchemaRef {
177 self.schema.clone()
178 }
179
180 fn output_ordering(&self) -> Option<&[OrderOption]> {
181 self.inner.output_ordering()
182 }
183
184 fn metrics(&self) -> Option<RecordBatchMetrics> {
185 self.inner.metrics()
186 }
187}
188
189impl Stream for SendableRecordBatchMapper {
190 type Item = Result<RecordBatch>;
191
192 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
193 if self.apply_mapper {
194 Pin::new(&mut self.inner).poll_next(cx).map(|opt| {
195 opt.map(|result| {
196 result
197 .and_then(|batch| (self.mapper)(batch, &self.inner.schema(), &self.schema))
198 })
199 })
200 } else {
201 Pin::new(&mut self.inner).poll_next(cx)
202 }
203 }
204}
205
206pub struct EmptyRecordBatchStream {
209 schema: SchemaRef,
211}
212
213impl EmptyRecordBatchStream {
214 pub fn new(schema: SchemaRef) -> Self {
216 Self { schema }
217 }
218}
219
220impl RecordBatchStream for EmptyRecordBatchStream {
221 fn schema(&self) -> SchemaRef {
222 self.schema.clone()
223 }
224
225 fn output_ordering(&self) -> Option<&[OrderOption]> {
226 None
227 }
228
229 fn metrics(&self) -> Option<RecordBatchMetrics> {
230 None
231 }
232}
233
234impl Stream for EmptyRecordBatchStream {
235 type Item = Result<RecordBatch>;
236
237 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
238 Poll::Ready(None)
239 }
240}
241
242#[derive(Debug, PartialEq)]
243pub struct RecordBatches {
244 schema: SchemaRef,
245 batches: Vec<RecordBatch>,
246}
247
248impl RecordBatches {
249 pub fn try_from_columns<I: IntoIterator<Item = VectorRef>>(
250 schema: SchemaRef,
251 columns: I,
252 ) -> Result<Self> {
253 let batches = vec![RecordBatch::new(schema.clone(), columns)?];
254 Ok(Self { schema, batches })
255 }
256
257 pub async fn try_collect(stream: SendableRecordBatchStream) -> Result<Self> {
258 let schema = stream.schema();
259 let batches = stream.try_collect::<Vec<_>>().await?;
260 Ok(Self { schema, batches })
261 }
262
263 #[inline]
264 pub fn empty() -> Self {
265 Self {
266 schema: Arc::new(Schema::new(vec![])),
267 batches: vec![],
268 }
269 }
270
271 pub fn iter(&self) -> impl Iterator<Item = &RecordBatch> {
272 self.batches.iter()
273 }
274
275 pub fn pretty_print(&self) -> Result<String> {
276 let df_batches = &self
277 .iter()
278 .map(|x| x.df_record_batch().clone())
279 .collect::<Vec<_>>();
280 let result = pretty::pretty_format_batches(df_batches).context(error::FormatSnafu)?;
281
282 Ok(result.to_string())
283 }
284
285 pub fn try_new(schema: SchemaRef, batches: Vec<RecordBatch>) -> Result<Self> {
286 for batch in &batches {
287 ensure!(
288 batch.schema == schema,
289 error::CreateRecordBatchesSnafu {
290 reason: format!(
291 "expect RecordBatch schema equals {:?}, actual: {:?}",
292 schema, batch.schema
293 )
294 }
295 )
296 }
297 Ok(Self { schema, batches })
298 }
299
300 pub fn schema(&self) -> SchemaRef {
301 self.schema.clone()
302 }
303
304 pub fn take(self) -> Vec<RecordBatch> {
305 self.batches
306 }
307
308 pub fn as_stream(&self) -> SendableRecordBatchStream {
309 Box::pin(SimpleRecordBatchStream {
310 inner: RecordBatches {
311 schema: self.schema(),
312 batches: self.batches.clone(),
313 },
314 index: 0,
315 })
316 }
317}
318
319impl IntoIterator for RecordBatches {
320 type Item = RecordBatch;
321 type IntoIter = std::vec::IntoIter<Self::Item>;
322
323 fn into_iter(self) -> Self::IntoIter {
324 self.batches.into_iter()
325 }
326}
327
328pub struct SimpleRecordBatchStream {
329 inner: RecordBatches,
330 index: usize,
331}
332
333impl RecordBatchStream for SimpleRecordBatchStream {
334 fn schema(&self) -> SchemaRef {
335 self.inner.schema()
336 }
337
338 fn output_ordering(&self) -> Option<&[OrderOption]> {
339 None
340 }
341
342 fn metrics(&self) -> Option<RecordBatchMetrics> {
343 None
344 }
345}
346
347impl Stream for SimpleRecordBatchStream {
348 type Item = Result<RecordBatch>;
349
350 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
351 Poll::Ready(if self.index < self.inner.batches.len() {
352 let batch = self.inner.batches[self.index].clone();
353 self.index += 1;
354 Some(Ok(batch))
355 } else {
356 None
357 })
358 }
359}
360
361pub struct RecordBatchStreamWrapper<S> {
363 pub schema: SchemaRef,
364 pub stream: S,
365 pub output_ordering: Option<Vec<OrderOption>>,
366 pub metrics: Arc<ArcSwapOption<RecordBatchMetrics>>,
367}
368
369impl<S> RecordBatchStreamWrapper<S> {
370 pub fn new(schema: SchemaRef, stream: S) -> RecordBatchStreamWrapper<S> {
372 RecordBatchStreamWrapper {
373 schema,
374 stream,
375 output_ordering: None,
376 metrics: Default::default(),
377 }
378 }
379}
380
381impl<S: Stream<Item = Result<RecordBatch>> + Unpin> RecordBatchStream
382 for RecordBatchStreamWrapper<S>
383{
384 fn name(&self) -> &str {
385 "RecordBatchStreamWrapper"
386 }
387
388 fn schema(&self) -> SchemaRef {
389 self.schema.clone()
390 }
391
392 fn output_ordering(&self) -> Option<&[OrderOption]> {
393 self.output_ordering.as_deref()
394 }
395
396 fn metrics(&self) -> Option<RecordBatchMetrics> {
397 self.metrics.load().as_ref().map(|s| s.as_ref().clone())
398 }
399}
400
401impl<S: Stream<Item = Result<RecordBatch>> + Unpin> Stream for RecordBatchStreamWrapper<S> {
402 type Item = Result<RecordBatch>;
403
404 fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
405 Pin::new(&mut self.stream).poll_next(ctx)
406 }
407}
408
409#[cfg(test)]
410mod tests {
411 use std::sync::Arc;
412
413 use datatypes::prelude::{ConcreteDataType, VectorRef};
414 use datatypes::schema::{ColumnSchema, Schema};
415 use datatypes::vectors::{BooleanVector, Int32Vector, StringVector};
416
417 use super::*;
418
419 #[test]
420 fn test_recordbatches_try_from_columns() {
421 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
422 "a",
423 ConcreteDataType::int32_datatype(),
424 false,
425 )]));
426 let result = RecordBatches::try_from_columns(
427 schema.clone(),
428 vec![Arc::new(StringVector::from(vec!["hello", "world"])) as _],
429 );
430 assert!(result.is_err());
431
432 let v: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
433 let expected = vec![RecordBatch::new(schema.clone(), vec![v.clone()]).unwrap()];
434 let r = RecordBatches::try_from_columns(schema, vec![v]).unwrap();
435 assert_eq!(r.take(), expected);
436 }
437
438 #[test]
439 fn test_recordbatches_try_new() {
440 let column_a = ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false);
441 let column_b = ColumnSchema::new("b", ConcreteDataType::string_datatype(), false);
442 let column_c = ColumnSchema::new("c", ConcreteDataType::boolean_datatype(), false);
443
444 let va: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
445 let vb: VectorRef = Arc::new(StringVector::from(vec!["hello", "world"]));
446 let vc: VectorRef = Arc::new(BooleanVector::from(vec![true, false]));
447
448 let schema1 = Arc::new(Schema::new(vec![column_a.clone(), column_b]));
449 let batch1 = RecordBatch::new(schema1.clone(), vec![va.clone(), vb]).unwrap();
450
451 let schema2 = Arc::new(Schema::new(vec![column_a, column_c]));
452 let batch2 = RecordBatch::new(schema2.clone(), vec![va, vc]).unwrap();
453
454 let result = RecordBatches::try_new(schema1.clone(), vec![batch1.clone(), batch2]);
455 assert!(result.is_err());
456 assert_eq!(
457 result.unwrap_err().to_string(),
458 format!(
459 "Failed to create RecordBatches, reason: expect RecordBatch schema equals {schema1:?}, actual: {schema2:?}",
460 )
461 );
462
463 let batches = RecordBatches::try_new(schema1.clone(), vec![batch1.clone()]).unwrap();
464 let expected = "\
465+---+-------+
466| a | b |
467+---+-------+
468| 1 | hello |
469| 2 | world |
470+---+-------+";
471 assert_eq!(batches.pretty_print().unwrap(), expected);
472
473 assert_eq!(schema1, batches.schema());
474 assert_eq!(vec![batch1], batches.take());
475 }
476
477 #[tokio::test]
478 async fn test_simple_recordbatch_stream() {
479 let column_a = ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false);
480 let column_b = ColumnSchema::new("b", ConcreteDataType::string_datatype(), false);
481 let schema = Arc::new(Schema::new(vec![column_a, column_b]));
482
483 let va1: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
484 let vb1: VectorRef = Arc::new(StringVector::from(vec!["a", "b"]));
485 let batch1 = RecordBatch::new(schema.clone(), vec![va1, vb1]).unwrap();
486
487 let va2: VectorRef = Arc::new(Int32Vector::from_slice([3, 4, 5]));
488 let vb2: VectorRef = Arc::new(StringVector::from(vec!["c", "d", "e"]));
489 let batch2 = RecordBatch::new(schema.clone(), vec![va2, vb2]).unwrap();
490
491 let recordbatches =
492 RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap();
493 let stream = recordbatches.as_stream();
494 let collected = util::collect(stream).await.unwrap();
495 assert_eq!(collected.len(), 2);
496 assert_eq!(collected[0], batch1);
497 assert_eq!(collected[1], batch2);
498 }
499}