1#![feature(never_type)]
16
17pub mod adapter;
18pub mod cursor;
19pub mod error;
20pub mod filter;
21mod recordbatch;
22pub mod util;
23
24use std::pin::Pin;
25use std::sync::Arc;
26
27use adapter::RecordBatchMetrics;
28use arc_swap::ArcSwapOption;
29pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
30use datatypes::arrow::compute::SortOptions;
31pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch;
32use datatypes::arrow::util::pretty;
33use datatypes::prelude::{ConcreteDataType, VectorRef};
34use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
35use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
36use datatypes::types::json_type_value_to_string;
37use datatypes::vectors::{BinaryVector, StringVectorBuilder};
38use error::Result;
39use futures::task::{Context, Poll};
40use futures::{Stream, TryStreamExt};
41pub use recordbatch::RecordBatch;
42use snafu::{ensure, OptionExt, ResultExt};
43
44pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
45 fn name(&self) -> &str {
46 "RecordBatchStream"
47 }
48
49 fn schema(&self) -> SchemaRef;
50
51 fn output_ordering(&self) -> Option<&[OrderOption]>;
52
53 fn metrics(&self) -> Option<RecordBatchMetrics>;
54}
55
56pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
57
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct OrderOption {
60 pub name: String,
61 pub options: SortOptions,
62}
63
64pub struct SendableRecordBatchMapper {
71 inner: SendableRecordBatchStream,
72 mapper: fn(RecordBatch, &SchemaRef, &SchemaRef) -> Result<RecordBatch>,
75 schema: SchemaRef,
77 apply_mapper: bool,
79}
80
81pub fn map_json_type_to_string(
87 batch: RecordBatch,
88 original_schema: &SchemaRef,
89 mapped_schema: &SchemaRef,
90) -> Result<RecordBatch> {
91 let mut vectors = Vec::with_capacity(original_schema.column_schemas().len());
92 for (vector, schema) in batch.columns.iter().zip(original_schema.column_schemas()) {
93 if let ConcreteDataType::Json(j) = schema.data_type {
94 let mut string_vector_builder = StringVectorBuilder::with_capacity(vector.len());
95 let binary_vector = vector
96 .as_any()
97 .downcast_ref::<BinaryVector>()
98 .with_context(|| error::DowncastVectorSnafu {
99 from_type: schema.data_type.clone(),
100 to_type: ConcreteDataType::binary_datatype(),
101 })?;
102 for value in binary_vector.iter_data() {
103 let Some(value) = value else {
104 string_vector_builder.push(None);
105 continue;
106 };
107 let string_value =
108 json_type_value_to_string(value, &j.format).with_context(|_| {
109 error::CastVectorSnafu {
110 from_type: schema.data_type.clone(),
111 to_type: ConcreteDataType::string_datatype(),
112 }
113 })?;
114 string_vector_builder.push(Some(string_value.as_str()));
115 }
116
117 let string_vector = string_vector_builder.finish();
118 vectors.push(Arc::new(string_vector) as VectorRef);
119 } else {
120 vectors.push(vector.clone());
121 }
122 }
123
124 RecordBatch::new(mapped_schema.clone(), vectors)
125}
126
127pub fn map_json_type_to_string_schema(schema: SchemaRef) -> (SchemaRef, bool) {
135 let mut new_columns = Vec::with_capacity(schema.column_schemas().len());
136 let mut apply_mapper = false;
137 for column in schema.column_schemas() {
138 if matches!(column.data_type, ConcreteDataType::Json(_)) {
139 new_columns.push(ColumnSchema::new(
140 column.name.to_string(),
141 ConcreteDataType::string_datatype(),
142 column.is_nullable(),
143 ));
144 apply_mapper = true;
145 } else {
146 new_columns.push(column.clone());
147 }
148 }
149 (Arc::new(Schema::new(new_columns)), apply_mapper)
150}
151
152impl SendableRecordBatchMapper {
153 pub fn new(
155 inner: SendableRecordBatchStream,
156 mapper: fn(RecordBatch, &SchemaRef, &SchemaRef) -> Result<RecordBatch>,
157 schema_mapper: fn(SchemaRef) -> (SchemaRef, bool),
158 ) -> Self {
159 let (mapped_schema, apply_mapper) = schema_mapper(inner.schema());
160 Self {
161 inner,
162 mapper,
163 schema: mapped_schema,
164 apply_mapper,
165 }
166 }
167}
168
169impl RecordBatchStream for SendableRecordBatchMapper {
170 fn name(&self) -> &str {
171 "SendableRecordBatchMapper"
172 }
173
174 fn schema(&self) -> SchemaRef {
175 self.schema.clone()
176 }
177
178 fn output_ordering(&self) -> Option<&[OrderOption]> {
179 self.inner.output_ordering()
180 }
181
182 fn metrics(&self) -> Option<RecordBatchMetrics> {
183 self.inner.metrics()
184 }
185}
186
187impl Stream for SendableRecordBatchMapper {
188 type Item = Result<RecordBatch>;
189
190 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
191 if self.apply_mapper {
192 Pin::new(&mut self.inner).poll_next(cx).map(|opt| {
193 opt.map(|result| {
194 result
195 .and_then(|batch| (self.mapper)(batch, &self.inner.schema(), &self.schema))
196 })
197 })
198 } else {
199 Pin::new(&mut self.inner).poll_next(cx)
200 }
201 }
202}
203
204pub struct EmptyRecordBatchStream {
207 schema: SchemaRef,
209}
210
211impl EmptyRecordBatchStream {
212 pub fn new(schema: SchemaRef) -> Self {
214 Self { schema }
215 }
216}
217
218impl RecordBatchStream for EmptyRecordBatchStream {
219 fn schema(&self) -> SchemaRef {
220 self.schema.clone()
221 }
222
223 fn output_ordering(&self) -> Option<&[OrderOption]> {
224 None
225 }
226
227 fn metrics(&self) -> Option<RecordBatchMetrics> {
228 None
229 }
230}
231
232impl Stream for EmptyRecordBatchStream {
233 type Item = Result<RecordBatch>;
234
235 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
236 Poll::Ready(None)
237 }
238}
239
240#[derive(Debug, PartialEq)]
241pub struct RecordBatches {
242 schema: SchemaRef,
243 batches: Vec<RecordBatch>,
244}
245
246impl RecordBatches {
247 pub fn try_from_columns<I: IntoIterator<Item = VectorRef>>(
248 schema: SchemaRef,
249 columns: I,
250 ) -> Result<Self> {
251 let batches = vec![RecordBatch::new(schema.clone(), columns)?];
252 Ok(Self { schema, batches })
253 }
254
255 pub async fn try_collect(stream: SendableRecordBatchStream) -> Result<Self> {
256 let schema = stream.schema();
257 let batches = stream.try_collect::<Vec<_>>().await?;
258 Ok(Self { schema, batches })
259 }
260
261 #[inline]
262 pub fn empty() -> Self {
263 Self {
264 schema: Arc::new(Schema::new(vec![])),
265 batches: vec![],
266 }
267 }
268
269 pub fn iter(&self) -> impl Iterator<Item = &RecordBatch> {
270 self.batches.iter()
271 }
272
273 pub fn pretty_print(&self) -> Result<String> {
274 let df_batches = &self
275 .iter()
276 .map(|x| x.df_record_batch().clone())
277 .collect::<Vec<_>>();
278 let result = pretty::pretty_format_batches(df_batches).context(error::FormatSnafu)?;
279
280 Ok(result.to_string())
281 }
282
283 pub fn try_new(schema: SchemaRef, batches: Vec<RecordBatch>) -> Result<Self> {
284 for batch in &batches {
285 ensure!(
286 batch.schema == schema,
287 error::CreateRecordBatchesSnafu {
288 reason: format!(
289 "expect RecordBatch schema equals {:?}, actual: {:?}",
290 schema, batch.schema
291 )
292 }
293 )
294 }
295 Ok(Self { schema, batches })
296 }
297
298 pub fn schema(&self) -> SchemaRef {
299 self.schema.clone()
300 }
301
302 pub fn take(self) -> Vec<RecordBatch> {
303 self.batches
304 }
305
306 pub fn as_stream(&self) -> SendableRecordBatchStream {
307 Box::pin(SimpleRecordBatchStream {
308 inner: RecordBatches {
309 schema: self.schema(),
310 batches: self.batches.clone(),
311 },
312 index: 0,
313 })
314 }
315}
316
317impl IntoIterator for RecordBatches {
318 type Item = RecordBatch;
319 type IntoIter = std::vec::IntoIter<Self::Item>;
320
321 fn into_iter(self) -> Self::IntoIter {
322 self.batches.into_iter()
323 }
324}
325
326pub struct SimpleRecordBatchStream {
327 inner: RecordBatches,
328 index: usize,
329}
330
331impl RecordBatchStream for SimpleRecordBatchStream {
332 fn schema(&self) -> SchemaRef {
333 self.inner.schema()
334 }
335
336 fn output_ordering(&self) -> Option<&[OrderOption]> {
337 None
338 }
339
340 fn metrics(&self) -> Option<RecordBatchMetrics> {
341 None
342 }
343}
344
345impl Stream for SimpleRecordBatchStream {
346 type Item = Result<RecordBatch>;
347
348 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
349 Poll::Ready(if self.index < self.inner.batches.len() {
350 let batch = self.inner.batches[self.index].clone();
351 self.index += 1;
352 Some(Ok(batch))
353 } else {
354 None
355 })
356 }
357}
358
359pub struct RecordBatchStreamWrapper<S> {
361 pub schema: SchemaRef,
362 pub stream: S,
363 pub output_ordering: Option<Vec<OrderOption>>,
364 pub metrics: Arc<ArcSwapOption<RecordBatchMetrics>>,
365}
366
367impl<S> RecordBatchStreamWrapper<S> {
368 pub fn new(schema: SchemaRef, stream: S) -> RecordBatchStreamWrapper<S> {
370 RecordBatchStreamWrapper {
371 schema,
372 stream,
373 output_ordering: None,
374 metrics: Default::default(),
375 }
376 }
377}
378
379impl<S: Stream<Item = Result<RecordBatch>> + Unpin> RecordBatchStream
380 for RecordBatchStreamWrapper<S>
381{
382 fn name(&self) -> &str {
383 "RecordBatchStreamWrapper"
384 }
385
386 fn schema(&self) -> SchemaRef {
387 self.schema.clone()
388 }
389
390 fn output_ordering(&self) -> Option<&[OrderOption]> {
391 self.output_ordering.as_deref()
392 }
393
394 fn metrics(&self) -> Option<RecordBatchMetrics> {
395 self.metrics.load().as_ref().map(|s| s.as_ref().clone())
396 }
397}
398
399impl<S: Stream<Item = Result<RecordBatch>> + Unpin> Stream for RecordBatchStreamWrapper<S> {
400 type Item = Result<RecordBatch>;
401
402 fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
403 Pin::new(&mut self.stream).poll_next(ctx)
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use std::sync::Arc;
410
411 use datatypes::prelude::{ConcreteDataType, VectorRef};
412 use datatypes::schema::{ColumnSchema, Schema};
413 use datatypes::vectors::{BooleanVector, Int32Vector, StringVector};
414
415 use super::*;
416
417 #[test]
418 fn test_recordbatches_try_from_columns() {
419 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
420 "a",
421 ConcreteDataType::int32_datatype(),
422 false,
423 )]));
424 let result = RecordBatches::try_from_columns(
425 schema.clone(),
426 vec![Arc::new(StringVector::from(vec!["hello", "world"])) as _],
427 );
428 assert!(result.is_err());
429
430 let v: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
431 let expected = vec![RecordBatch::new(schema.clone(), vec![v.clone()]).unwrap()];
432 let r = RecordBatches::try_from_columns(schema, vec![v]).unwrap();
433 assert_eq!(r.take(), expected);
434 }
435
436 #[test]
437 fn test_recordbatches_try_new() {
438 let column_a = ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false);
439 let column_b = ColumnSchema::new("b", ConcreteDataType::string_datatype(), false);
440 let column_c = ColumnSchema::new("c", ConcreteDataType::boolean_datatype(), false);
441
442 let va: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
443 let vb: VectorRef = Arc::new(StringVector::from(vec!["hello", "world"]));
444 let vc: VectorRef = Arc::new(BooleanVector::from(vec![true, false]));
445
446 let schema1 = Arc::new(Schema::new(vec![column_a.clone(), column_b]));
447 let batch1 = RecordBatch::new(schema1.clone(), vec![va.clone(), vb]).unwrap();
448
449 let schema2 = Arc::new(Schema::new(vec![column_a, column_c]));
450 let batch2 = RecordBatch::new(schema2.clone(), vec![va, vc]).unwrap();
451
452 let result = RecordBatches::try_new(schema1.clone(), vec![batch1.clone(), batch2]);
453 assert!(result.is_err());
454 assert_eq!(
455 result.unwrap_err().to_string(),
456 format!(
457 "Failed to create RecordBatches, reason: expect RecordBatch schema equals {schema1:?}, actual: {schema2:?}",
458 )
459 );
460
461 let batches = RecordBatches::try_new(schema1.clone(), vec![batch1.clone()]).unwrap();
462 let expected = "\
463+---+-------+
464| a | b |
465+---+-------+
466| 1 | hello |
467| 2 | world |
468+---+-------+";
469 assert_eq!(batches.pretty_print().unwrap(), expected);
470
471 assert_eq!(schema1, batches.schema());
472 assert_eq!(vec![batch1], batches.take());
473 }
474
475 #[tokio::test]
476 async fn test_simple_recordbatch_stream() {
477 let column_a = ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false);
478 let column_b = ColumnSchema::new("b", ConcreteDataType::string_datatype(), false);
479 let schema = Arc::new(Schema::new(vec![column_a, column_b]));
480
481 let va1: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
482 let vb1: VectorRef = Arc::new(StringVector::from(vec!["a", "b"]));
483 let batch1 = RecordBatch::new(schema.clone(), vec![va1, vb1]).unwrap();
484
485 let va2: VectorRef = Arc::new(Int32Vector::from_slice([3, 4, 5]));
486 let vb2: VectorRef = Arc::new(StringVector::from(vec!["c", "d", "e"]));
487 let batch2 = RecordBatch::new(schema.clone(), vec![va2, vb2]).unwrap();
488
489 let recordbatches =
490 RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap();
491 let stream = recordbatches.as_stream();
492 let collected = util::collect(stream).await.unwrap();
493 assert_eq!(collected.len(), 2);
494 assert_eq!(collected[0], batch1);
495 assert_eq!(collected[1], batch2);
496 }
497}