1#![feature(never_type)]
16
17pub mod adapter;
18pub mod cursor;
19pub mod error;
20pub mod filter;
21mod recordbatch;
22pub mod util;
23
24use std::pin::Pin;
25use std::sync::Arc;
26
27use adapter::RecordBatchMetrics;
28use arc_swap::ArcSwapOption;
29pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
30use datatypes::arrow::compute::SortOptions;
31pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch;
32use datatypes::arrow::util::pretty;
33use datatypes::prelude::VectorRef;
34use datatypes::schema::{Schema, SchemaRef};
35use error::Result;
36use futures::task::{Context, Poll};
37use futures::{Stream, TryStreamExt};
38pub use recordbatch::RecordBatch;
39use snafu::{ensure, ResultExt};
40
41pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
42 fn name(&self) -> &str {
43 "RecordBatchStream"
44 }
45
46 fn schema(&self) -> SchemaRef;
47
48 fn output_ordering(&self) -> Option<&[OrderOption]>;
49
50 fn metrics(&self) -> Option<RecordBatchMetrics>;
51}
52
53pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
54
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub struct OrderOption {
57 pub name: String,
58 pub options: SortOptions,
59}
60
61pub struct EmptyRecordBatchStream {
64 schema: SchemaRef,
66}
67
68impl EmptyRecordBatchStream {
69 pub fn new(schema: SchemaRef) -> Self {
71 Self { schema }
72 }
73}
74
75impl RecordBatchStream for EmptyRecordBatchStream {
76 fn schema(&self) -> SchemaRef {
77 self.schema.clone()
78 }
79
80 fn output_ordering(&self) -> Option<&[OrderOption]> {
81 None
82 }
83
84 fn metrics(&self) -> Option<RecordBatchMetrics> {
85 None
86 }
87}
88
89impl Stream for EmptyRecordBatchStream {
90 type Item = Result<RecordBatch>;
91
92 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
93 Poll::Ready(None)
94 }
95}
96
97#[derive(Debug, PartialEq)]
98pub struct RecordBatches {
99 schema: SchemaRef,
100 batches: Vec<RecordBatch>,
101}
102
103impl RecordBatches {
104 pub fn try_from_columns<I: IntoIterator<Item = VectorRef>>(
105 schema: SchemaRef,
106 columns: I,
107 ) -> Result<Self> {
108 let batches = vec![RecordBatch::new(schema.clone(), columns)?];
109 Ok(Self { schema, batches })
110 }
111
112 pub async fn try_collect(stream: SendableRecordBatchStream) -> Result<Self> {
113 let schema = stream.schema();
114 let batches = stream.try_collect::<Vec<_>>().await?;
115 Ok(Self { schema, batches })
116 }
117
118 #[inline]
119 pub fn empty() -> Self {
120 Self {
121 schema: Arc::new(Schema::new(vec![])),
122 batches: vec![],
123 }
124 }
125
126 pub fn iter(&self) -> impl Iterator<Item = &RecordBatch> {
127 self.batches.iter()
128 }
129
130 pub fn pretty_print(&self) -> Result<String> {
131 let df_batches = &self
132 .iter()
133 .map(|x| x.df_record_batch().clone())
134 .collect::<Vec<_>>();
135 let result = pretty::pretty_format_batches(df_batches).context(error::FormatSnafu)?;
136
137 Ok(result.to_string())
138 }
139
140 pub fn try_new(schema: SchemaRef, batches: Vec<RecordBatch>) -> Result<Self> {
141 for batch in &batches {
142 ensure!(
143 batch.schema == schema,
144 error::CreateRecordBatchesSnafu {
145 reason: format!(
146 "expect RecordBatch schema equals {:?}, actual: {:?}",
147 schema, batch.schema
148 )
149 }
150 )
151 }
152 Ok(Self { schema, batches })
153 }
154
155 pub fn schema(&self) -> SchemaRef {
156 self.schema.clone()
157 }
158
159 pub fn take(self) -> Vec<RecordBatch> {
160 self.batches
161 }
162
163 pub fn as_stream(&self) -> SendableRecordBatchStream {
164 Box::pin(SimpleRecordBatchStream {
165 inner: RecordBatches {
166 schema: self.schema(),
167 batches: self.batches.clone(),
168 },
169 index: 0,
170 })
171 }
172}
173
174impl IntoIterator for RecordBatches {
175 type Item = RecordBatch;
176 type IntoIter = std::vec::IntoIter<Self::Item>;
177
178 fn into_iter(self) -> Self::IntoIter {
179 self.batches.into_iter()
180 }
181}
182
183pub struct SimpleRecordBatchStream {
184 inner: RecordBatches,
185 index: usize,
186}
187
188impl RecordBatchStream for SimpleRecordBatchStream {
189 fn schema(&self) -> SchemaRef {
190 self.inner.schema()
191 }
192
193 fn output_ordering(&self) -> Option<&[OrderOption]> {
194 None
195 }
196
197 fn metrics(&self) -> Option<RecordBatchMetrics> {
198 None
199 }
200}
201
202impl Stream for SimpleRecordBatchStream {
203 type Item = Result<RecordBatch>;
204
205 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
206 Poll::Ready(if self.index < self.inner.batches.len() {
207 let batch = self.inner.batches[self.index].clone();
208 self.index += 1;
209 Some(Ok(batch))
210 } else {
211 None
212 })
213 }
214}
215
216pub struct RecordBatchStreamWrapper<S> {
218 pub schema: SchemaRef,
219 pub stream: S,
220 pub output_ordering: Option<Vec<OrderOption>>,
221 pub metrics: Arc<ArcSwapOption<RecordBatchMetrics>>,
222}
223
224impl<S> RecordBatchStreamWrapper<S> {
225 pub fn new(schema: SchemaRef, stream: S) -> RecordBatchStreamWrapper<S> {
227 RecordBatchStreamWrapper {
228 schema,
229 stream,
230 output_ordering: None,
231 metrics: Default::default(),
232 }
233 }
234}
235
236impl<S: Stream<Item = Result<RecordBatch>> + Unpin> RecordBatchStream
237 for RecordBatchStreamWrapper<S>
238{
239 fn name(&self) -> &str {
240 "RecordBatchStreamWrapper"
241 }
242
243 fn schema(&self) -> SchemaRef {
244 self.schema.clone()
245 }
246
247 fn output_ordering(&self) -> Option<&[OrderOption]> {
248 self.output_ordering.as_deref()
249 }
250
251 fn metrics(&self) -> Option<RecordBatchMetrics> {
252 self.metrics.load().as_ref().map(|s| s.as_ref().clone())
253 }
254}
255
256impl<S: Stream<Item = Result<RecordBatch>> + Unpin> Stream for RecordBatchStreamWrapper<S> {
257 type Item = Result<RecordBatch>;
258
259 fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
260 Pin::new(&mut self.stream).poll_next(ctx)
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use std::sync::Arc;
267
268 use datatypes::prelude::{ConcreteDataType, VectorRef};
269 use datatypes::schema::{ColumnSchema, Schema};
270 use datatypes::vectors::{BooleanVector, Int32Vector, StringVector};
271
272 use super::*;
273
274 #[test]
275 fn test_recordbatches_try_from_columns() {
276 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
277 "a",
278 ConcreteDataType::int32_datatype(),
279 false,
280 )]));
281 let result = RecordBatches::try_from_columns(
282 schema.clone(),
283 vec![Arc::new(StringVector::from(vec!["hello", "world"])) as _],
284 );
285 assert!(result.is_err());
286
287 let v: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
288 let expected = vec![RecordBatch::new(schema.clone(), vec![v.clone()]).unwrap()];
289 let r = RecordBatches::try_from_columns(schema, vec![v]).unwrap();
290 assert_eq!(r.take(), expected);
291 }
292
293 #[test]
294 fn test_recordbatches_try_new() {
295 let column_a = ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false);
296 let column_b = ColumnSchema::new("b", ConcreteDataType::string_datatype(), false);
297 let column_c = ColumnSchema::new("c", ConcreteDataType::boolean_datatype(), false);
298
299 let va: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
300 let vb: VectorRef = Arc::new(StringVector::from(vec!["hello", "world"]));
301 let vc: VectorRef = Arc::new(BooleanVector::from(vec![true, false]));
302
303 let schema1 = Arc::new(Schema::new(vec![column_a.clone(), column_b]));
304 let batch1 = RecordBatch::new(schema1.clone(), vec![va.clone(), vb]).unwrap();
305
306 let schema2 = Arc::new(Schema::new(vec![column_a, column_c]));
307 let batch2 = RecordBatch::new(schema2.clone(), vec![va, vc]).unwrap();
308
309 let result = RecordBatches::try_new(schema1.clone(), vec![batch1.clone(), batch2]);
310 assert!(result.is_err());
311 assert_eq!(
312 result.unwrap_err().to_string(),
313 format!(
314 "Failed to create RecordBatches, reason: expect RecordBatch schema equals {schema1:?}, actual: {schema2:?}",
315 )
316 );
317
318 let batches = RecordBatches::try_new(schema1.clone(), vec![batch1.clone()]).unwrap();
319 let expected = "\
320+---+-------+
321| a | b |
322+---+-------+
323| 1 | hello |
324| 2 | world |
325+---+-------+";
326 assert_eq!(batches.pretty_print().unwrap(), expected);
327
328 assert_eq!(schema1, batches.schema());
329 assert_eq!(vec![batch1], batches.take());
330 }
331
332 #[tokio::test]
333 async fn test_simple_recordbatch_stream() {
334 let column_a = ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false);
335 let column_b = ColumnSchema::new("b", ConcreteDataType::string_datatype(), false);
336 let schema = Arc::new(Schema::new(vec![column_a, column_b]));
337
338 let va1: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
339 let vb1: VectorRef = Arc::new(StringVector::from(vec!["a", "b"]));
340 let batch1 = RecordBatch::new(schema.clone(), vec![va1, vb1]).unwrap();
341
342 let va2: VectorRef = Arc::new(Int32Vector::from_slice([3, 4, 5]));
343 let vb2: VectorRef = Arc::new(StringVector::from(vec!["c", "d", "e"]));
344 let batch2 = RecordBatch::new(schema.clone(), vec![va2, vb2]).unwrap();
345
346 let recordbatches =
347 RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap();
348 let stream = recordbatches.as_stream();
349 let collected = util::collect(stream).await.unwrap();
350 assert_eq!(collected.len(), 2);
351 assert_eq!(collected[0], batch1);
352 assert_eq!(collected[1], batch2);
353 }
354}