common_recordbatch/
adapter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{self, Display};
16use std::future::Future;
17use std::marker::PhantomData;
18use std::pin::Pin;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22
23use datafusion::arrow::compute::cast;
24use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
25use datafusion::error::Result as DfResult;
26use datafusion::execution::context::ExecutionProps;
27use datafusion::logical_expr::utils::conjunction;
28use datafusion::logical_expr::Expr;
29use datafusion::physical_expr::create_physical_expr;
30use datafusion::physical_plan::metrics::{BaselineMetrics, MetricValue};
31use datafusion::physical_plan::{
32    accept, DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor, PhysicalExpr,
33    RecordBatchStream as DfRecordBatchStream,
34};
35use datafusion_common::arrow::error::ArrowError;
36use datafusion_common::{DataFusionError, ToDFSchema};
37use datatypes::arrow::array::Array;
38use datatypes::arrow::datatypes::DataType as ArrowDataType;
39use datatypes::schema::{ColumnExtType, Schema, SchemaRef};
40use futures::ready;
41use jsonb;
42use pin_project::pin_project;
43use snafu::ResultExt;
44
45use crate::error::{self, Result};
46use crate::filter::batch_filter;
47use crate::{
48    DfRecordBatch, DfSendableRecordBatchStream, OrderOption, RecordBatch, RecordBatchStream,
49    SendableRecordBatchStream, Stream,
50};
51
52type FutureStream =
53    Pin<Box<dyn std::future::Future<Output = Result<SendableRecordBatchStream>> + Send>>;
54
55/// Casts the `RecordBatch`es of `stream` against the `output_schema`.
56#[pin_project]
57pub struct RecordBatchStreamTypeAdapter<T, E> {
58    #[pin]
59    stream: T,
60    projected_schema: DfSchemaRef,
61    projection: Vec<usize>,
62    predicate: Option<Arc<dyn PhysicalExpr>>,
63    phantom: PhantomData<E>,
64}
65
66impl<T, E> RecordBatchStreamTypeAdapter<T, E>
67where
68    T: Stream<Item = std::result::Result<DfRecordBatch, E>>,
69    E: std::error::Error + Send + Sync + 'static,
70{
71    pub fn new(projected_schema: DfSchemaRef, stream: T, projection: Option<Vec<usize>>) -> Self {
72        let projection = if let Some(projection) = projection {
73            projection
74        } else {
75            (0..projected_schema.fields().len()).collect()
76        };
77
78        Self {
79            stream,
80            projected_schema,
81            projection,
82            predicate: None,
83            phantom: Default::default(),
84        }
85    }
86
87    pub fn with_filter(mut self, filters: Vec<Expr>) -> Result<Self> {
88        let filters = if let Some(expr) = conjunction(filters) {
89            let df_schema = self
90                .projected_schema
91                .clone()
92                .to_dfschema_ref()
93                .context(error::PhysicalExprSnafu)?;
94
95            let filters = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())
96                .context(error::PhysicalExprSnafu)?;
97            Some(filters)
98        } else {
99            None
100        };
101        self.predicate = filters;
102        Ok(self)
103    }
104}
105
106impl<T, E> DfRecordBatchStream for RecordBatchStreamTypeAdapter<T, E>
107where
108    T: Stream<Item = std::result::Result<DfRecordBatch, E>>,
109    E: std::error::Error + Send + Sync + 'static,
110{
111    fn schema(&self) -> DfSchemaRef {
112        self.projected_schema.clone()
113    }
114}
115
116impl<T, E> Stream for RecordBatchStreamTypeAdapter<T, E>
117where
118    T: Stream<Item = std::result::Result<DfRecordBatch, E>>,
119    E: std::error::Error + Send + Sync + 'static,
120{
121    type Item = DfResult<DfRecordBatch>;
122
123    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
124        let this = self.project();
125
126        let batch = futures::ready!(this.stream.poll_next(cx))
127            .map(|r| r.map_err(|e| DataFusionError::External(Box::new(e))));
128
129        let projected_schema = this.projected_schema.clone();
130        let projection = this.projection.clone();
131        let predicate = this.predicate.clone();
132
133        let batch = batch.map(|b| {
134            b.and_then(|b| {
135                let projected_column = b.project(&projection)?;
136                if projected_column.schema().fields.len() != projected_schema.fields.len() {
137                   return Err(DataFusionError::ArrowError(Box::new(ArrowError::SchemaError(format!(
138                        "Trying to cast a RecordBatch into an incompatible schema. RecordBatch: {}, Target: {}",
139                        projected_column.schema(),
140                        projected_schema,
141                    ))), None));
142                }
143
144                let mut columns = Vec::with_capacity(projected_schema.fields.len());
145                for (idx,field) in projected_schema.fields.iter().enumerate() {
146                    let column = projected_column.column(idx);
147                    let extype = field.metadata().get("greptime:type").and_then(|s| ColumnExtType::from_str(s).ok());
148                    let output = custom_cast(&column, field.data_type(), extype)?;
149                    columns.push(output)
150                }
151                let record_batch = DfRecordBatch::try_new(projected_schema, columns)?;
152                let record_batch = if let Some(predicate) = predicate {
153                    batch_filter(&record_batch, &predicate)?
154                } else {
155                    record_batch
156                };
157                Ok(record_batch)
158            })
159        });
160
161        Poll::Ready(batch)
162    }
163
164    #[inline]
165    fn size_hint(&self) -> (usize, Option<usize>) {
166        self.stream.size_hint()
167    }
168}
169
170/// Greptime SendableRecordBatchStream -> DataFusion RecordBatchStream.
171/// The reverse one is [RecordBatchStreamAdapter].
172pub struct DfRecordBatchStreamAdapter {
173    stream: SendableRecordBatchStream,
174}
175
176impl DfRecordBatchStreamAdapter {
177    pub fn new(stream: SendableRecordBatchStream) -> Self {
178        Self { stream }
179    }
180}
181
182impl DfRecordBatchStream for DfRecordBatchStreamAdapter {
183    fn schema(&self) -> DfSchemaRef {
184        self.stream.schema().arrow_schema().clone()
185    }
186}
187
188impl Stream for DfRecordBatchStreamAdapter {
189    type Item = DfResult<DfRecordBatch>;
190
191    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
192        match Pin::new(&mut self.stream).poll_next(cx) {
193            Poll::Pending => Poll::Pending,
194            Poll::Ready(Some(recordbatch)) => match recordbatch {
195                Ok(recordbatch) => Poll::Ready(Some(Ok(recordbatch.into_df_record_batch()))),
196                Err(e) => Poll::Ready(Some(Err(DataFusionError::External(Box::new(e))))),
197            },
198            Poll::Ready(None) => Poll::Ready(None),
199        }
200    }
201
202    #[inline]
203    fn size_hint(&self) -> (usize, Option<usize>) {
204        self.stream.size_hint()
205    }
206}
207
208/// DataFusion [SendableRecordBatchStream](DfSendableRecordBatchStream) -> Greptime [RecordBatchStream].
209/// The reverse one is [DfRecordBatchStreamAdapter].
210/// It can collect metrics from DataFusion execution plan.
211pub struct RecordBatchStreamAdapter {
212    schema: SchemaRef,
213    stream: DfSendableRecordBatchStream,
214    metrics: Option<BaselineMetrics>,
215    /// Aggregated plan-level metrics. Resolved after an [ExecutionPlan] is finished.
216    metrics_2: Metrics,
217    /// Display plan and metrics in verbose mode.
218    explain_verbose: bool,
219}
220
221/// Json encoded metrics. Contains metric from a whole plan tree.
222enum Metrics {
223    Unavailable,
224    Unresolved(Arc<dyn ExecutionPlan>),
225    PartialResolved(Arc<dyn ExecutionPlan>, RecordBatchMetrics),
226    Resolved(RecordBatchMetrics),
227}
228
229impl RecordBatchStreamAdapter {
230    pub fn try_new(stream: DfSendableRecordBatchStream) -> Result<Self> {
231        let schema =
232            Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
233        Ok(Self {
234            schema,
235            stream,
236            metrics: None,
237            metrics_2: Metrics::Unavailable,
238            explain_verbose: false,
239        })
240    }
241
242    pub fn try_new_with_metrics_and_df_plan(
243        stream: DfSendableRecordBatchStream,
244        metrics: BaselineMetrics,
245        df_plan: Arc<dyn ExecutionPlan>,
246    ) -> Result<Self> {
247        let schema =
248            Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
249        Ok(Self {
250            schema,
251            stream,
252            metrics: Some(metrics),
253            metrics_2: Metrics::Unresolved(df_plan),
254            explain_verbose: false,
255        })
256    }
257
258    pub fn set_metrics2(&mut self, plan: Arc<dyn ExecutionPlan>) {
259        self.metrics_2 = Metrics::Unresolved(plan)
260    }
261
262    /// Set the verbose mode for displaying plan and metrics.
263    pub fn set_explain_verbose(&mut self, verbose: bool) {
264        self.explain_verbose = verbose;
265    }
266}
267
268impl RecordBatchStream for RecordBatchStreamAdapter {
269    fn name(&self) -> &str {
270        "RecordBatchStreamAdapter"
271    }
272
273    fn schema(&self) -> SchemaRef {
274        self.schema.clone()
275    }
276
277    fn metrics(&self) -> Option<RecordBatchMetrics> {
278        match &self.metrics_2 {
279            Metrics::Resolved(metrics) | Metrics::PartialResolved(_, metrics) => {
280                Some(metrics.clone())
281            }
282            Metrics::Unavailable | Metrics::Unresolved(_) => None,
283        }
284    }
285
286    fn output_ordering(&self) -> Option<&[OrderOption]> {
287        None
288    }
289}
290
291impl Stream for RecordBatchStreamAdapter {
292    type Item = Result<RecordBatch>;
293
294    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
295        let timer = self
296            .metrics
297            .as_ref()
298            .map(|m| m.elapsed_compute().clone())
299            .unwrap_or_default();
300        let _guard = timer.timer();
301        match Pin::new(&mut self.stream).poll_next(cx) {
302            Poll::Pending => Poll::Pending,
303            Poll::Ready(Some(df_record_batch)) => {
304                let df_record_batch = df_record_batch?;
305                if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) =
306                    &self.metrics_2
307                {
308                    let mut metric_collector = MetricCollector::new(self.explain_verbose);
309                    accept(df_plan.as_ref(), &mut metric_collector).unwrap();
310                    self.metrics_2 = Metrics::PartialResolved(
311                        df_plan.clone(),
312                        metric_collector.record_batch_metrics,
313                    );
314                }
315                Poll::Ready(Some(RecordBatch::try_from_df_record_batch(
316                    self.schema(),
317                    df_record_batch,
318                )))
319            }
320            Poll::Ready(None) => {
321                if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) =
322                    &self.metrics_2
323                {
324                    let mut metric_collector = MetricCollector::new(self.explain_verbose);
325                    accept(df_plan.as_ref(), &mut metric_collector).unwrap();
326                    self.metrics_2 = Metrics::Resolved(metric_collector.record_batch_metrics);
327                }
328                Poll::Ready(None)
329            }
330        }
331    }
332
333    #[inline]
334    fn size_hint(&self) -> (usize, Option<usize>) {
335        self.stream.size_hint()
336    }
337}
338
339/// An [ExecutionPlanVisitor] to collect metrics from a [ExecutionPlan].
340pub struct MetricCollector {
341    current_level: usize,
342    pub record_batch_metrics: RecordBatchMetrics,
343    verbose: bool,
344}
345
346impl MetricCollector {
347    pub fn new(verbose: bool) -> Self {
348        Self {
349            current_level: 0,
350            record_batch_metrics: RecordBatchMetrics::default(),
351            verbose,
352        }
353    }
354}
355
356impl ExecutionPlanVisitor for MetricCollector {
357    type Error = !;
358
359    fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> std::result::Result<bool, Self::Error> {
360        // skip if no metric available
361        let Some(metric) = plan.metrics() else {
362            self.record_batch_metrics.plan_metrics.push(PlanMetrics {
363                plan: plan.name().to_string(),
364                level: self.current_level,
365                metrics: vec![],
366            });
367            self.current_level += 1;
368            return Ok(true);
369        };
370
371        // scrape plan metrics
372        let metric = metric
373            .aggregate_by_name()
374            .sorted_for_display()
375            .timestamps_removed();
376        let mut plan_metric = PlanMetrics {
377            plan: one_line(plan, self.verbose).to_string(),
378            level: self.current_level,
379            metrics: Vec::with_capacity(metric.iter().size_hint().0),
380        };
381        for m in metric.iter() {
382            plan_metric
383                .metrics
384                .push((m.value().name().to_string(), m.value().as_usize()));
385
386            // aggregate high-level metrics
387            match m.value() {
388                MetricValue::ElapsedCompute(ec) => {
389                    self.record_batch_metrics.elapsed_compute += ec.value()
390                }
391                MetricValue::CurrentMemoryUsage(m) => {
392                    self.record_batch_metrics.memory_usage += m.value()
393                }
394                _ => {}
395            }
396        }
397        self.record_batch_metrics.plan_metrics.push(plan_metric);
398
399        self.current_level += 1;
400        Ok(true)
401    }
402
403    fn post_visit(&mut self, _plan: &dyn ExecutionPlan) -> std::result::Result<bool, Self::Error> {
404        self.current_level -= 1;
405        Ok(true)
406    }
407}
408
409/// Returns a single-line summary of the root of the plan.
410/// If the `verbose` flag is set, it will display detailed information about the plan.
411fn one_line(plan: &dyn ExecutionPlan, verbose: bool) -> impl fmt::Display + '_ {
412    struct Wrapper<'a> {
413        plan: &'a dyn ExecutionPlan,
414        format_type: DisplayFormatType,
415    }
416
417    impl fmt::Display for Wrapper<'_> {
418        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
419            self.plan.fmt_as(self.format_type, f)?;
420            writeln!(f)
421        }
422    }
423
424    let format_type = if verbose {
425        DisplayFormatType::Verbose
426    } else {
427        DisplayFormatType::Default
428    };
429    Wrapper { plan, format_type }
430}
431
432/// [`RecordBatchMetrics`] carrys metrics value
433/// from datanode to frontend through gRPC
434#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone)]
435pub struct RecordBatchMetrics {
436    // High-level aggregated metrics
437    /// CPU consumption in nanoseconds
438    pub elapsed_compute: usize,
439    /// Memory used by the plan in bytes
440    pub memory_usage: usize,
441    // Detailed per-plan metrics
442    /// An ordered list of plan metrics, from top to bottom in post-order.
443    pub plan_metrics: Vec<PlanMetrics>,
444}
445
446/// Only display `plan_metrics` with indent `  ` (2 spaces).
447impl Display for RecordBatchMetrics {
448    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
449        for metric in &self.plan_metrics {
450            write!(
451                f,
452                "{:indent$}{} metrics=[",
453                " ",
454                metric.plan.trim_end(),
455                indent = metric.level * 2,
456            )?;
457            for (label, value) in &metric.metrics {
458                write!(f, "{}: {}, ", label, value)?;
459            }
460            writeln!(f, "]")?;
461        }
462
463        Ok(())
464    }
465}
466
467#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone)]
468pub struct PlanMetrics {
469    /// The plan name
470    pub plan: String,
471    /// The level of the plan, starts from 0
472    pub level: usize,
473    /// An ordered key-value list of metrics.
474    /// Key is metric label and value is metric value.
475    pub metrics: Vec<(String, usize)>,
476}
477
478enum AsyncRecordBatchStreamAdapterState {
479    Uninit(FutureStream),
480    Ready(SendableRecordBatchStream),
481    Failed,
482}
483
484pub struct AsyncRecordBatchStreamAdapter {
485    schema: SchemaRef,
486    state: AsyncRecordBatchStreamAdapterState,
487}
488
489impl AsyncRecordBatchStreamAdapter {
490    pub fn new(schema: SchemaRef, stream: FutureStream) -> Self {
491        Self {
492            schema,
493            state: AsyncRecordBatchStreamAdapterState::Uninit(stream),
494        }
495    }
496}
497
498impl RecordBatchStream for AsyncRecordBatchStreamAdapter {
499    fn schema(&self) -> SchemaRef {
500        self.schema.clone()
501    }
502
503    fn output_ordering(&self) -> Option<&[OrderOption]> {
504        None
505    }
506
507    fn metrics(&self) -> Option<RecordBatchMetrics> {
508        None
509    }
510}
511
512impl Stream for AsyncRecordBatchStreamAdapter {
513    type Item = Result<RecordBatch>;
514
515    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
516        loop {
517            match &mut self.state {
518                AsyncRecordBatchStreamAdapterState::Uninit(stream_future) => {
519                    match ready!(Pin::new(stream_future).poll(cx)) {
520                        Ok(stream) => {
521                            self.state = AsyncRecordBatchStreamAdapterState::Ready(stream);
522                            continue;
523                        }
524                        Err(e) => {
525                            self.state = AsyncRecordBatchStreamAdapterState::Failed;
526                            return Poll::Ready(Some(Err(e)));
527                        }
528                    };
529                }
530                AsyncRecordBatchStreamAdapterState::Ready(stream) => {
531                    return Poll::Ready(ready!(Pin::new(stream).poll_next(cx)))
532                }
533                AsyncRecordBatchStreamAdapterState::Failed => return Poll::Ready(None),
534            }
535        }
536    }
537
538    // This is not supported for lazy stream.
539    #[inline]
540    fn size_hint(&self) -> (usize, Option<usize>) {
541        (0, None)
542    }
543}
544
545/// Custom cast function that handles Map -> Binary (JSON) conversion
546fn custom_cast(
547    array: &dyn Array,
548    target_type: &ArrowDataType,
549    extype: Option<ColumnExtType>,
550) -> std::result::Result<Arc<dyn Array>, ArrowError> {
551    if let ArrowDataType::Map(_, _) = array.data_type() {
552        if let ArrowDataType::Binary = target_type {
553            return convert_map_to_json_binary(array, extype);
554        }
555    }
556
557    cast(array, target_type)
558}
559
560/// Convert a Map array to a Binary array containing JSON data
561fn convert_map_to_json_binary(
562    array: &dyn Array,
563    extype: Option<ColumnExtType>,
564) -> std::result::Result<Arc<dyn Array>, ArrowError> {
565    use datatypes::arrow::array::{BinaryArray, MapArray};
566    use serde_json::Value;
567
568    let map_array = array
569        .as_any()
570        .downcast_ref::<MapArray>()
571        .ok_or_else(|| ArrowError::CastError("Failed to downcast to MapArray".to_string()))?;
572
573    let mut json_values = Vec::with_capacity(map_array.len());
574
575    for i in 0..map_array.len() {
576        if map_array.is_null(i) {
577            json_values.push(None);
578        } else {
579            // Extract the map entry at index i
580            let map_entry = map_array.value(i);
581            let key_value_array = map_entry
582                .as_any()
583                .downcast_ref::<datatypes::arrow::array::StructArray>()
584                .ok_or_else(|| {
585                    ArrowError::CastError("Failed to downcast to StructArray".to_string())
586                })?;
587
588            // Convert to JSON object
589            let mut json_obj = serde_json::Map::with_capacity(key_value_array.len());
590
591            for j in 0..key_value_array.len() {
592                if key_value_array.is_null(j) {
593                    continue;
594                }
595                let key_field = key_value_array.column(0);
596                let value_field = key_value_array.column(1);
597
598                if key_field.is_null(j) {
599                    continue;
600                }
601
602                let key = key_field
603                    .as_any()
604                    .downcast_ref::<datatypes::arrow::array::StringArray>()
605                    .ok_or_else(|| {
606                        ArrowError::CastError("Failed to downcast key to StringArray".to_string())
607                    })?
608                    .value(j);
609
610                let value = if value_field.is_null(j) {
611                    Value::Null
612                } else {
613                    let value_str = value_field
614                        .as_any()
615                        .downcast_ref::<datatypes::arrow::array::StringArray>()
616                        .ok_or_else(|| {
617                            ArrowError::CastError(
618                                "Failed to downcast value to StringArray".to_string(),
619                            )
620                        })?
621                        .value(j);
622                    Value::String(value_str.to_string())
623                };
624
625                json_obj.insert(key.to_string(), value);
626            }
627
628            let json_value = Value::Object(json_obj);
629            let json_bytes = match extype {
630                Some(ColumnExtType::Json) => {
631                    let json_string = match serde_json::to_string(&json_value) {
632                        Ok(s) => s,
633                        Err(e) => {
634                            return Err(ArrowError::CastError(format!(
635                                "Failed to serialize JSON: {}",
636                                e
637                            )))
638                        }
639                    };
640                    match jsonb::parse_value(json_string.as_bytes()) {
641                        Ok(jsonb_value) => jsonb_value.to_vec(),
642                        Err(e) => {
643                            return Err(ArrowError::CastError(format!(
644                                "Failed to serialize JSONB: {}",
645                                e
646                            )))
647                        }
648                    }
649                }
650                _ => match serde_json::to_vec(&json_value) {
651                    Ok(b) => b,
652                    Err(e) => {
653                        return Err(ArrowError::CastError(format!(
654                            "Failed to serialize JSON: {}",
655                            e
656                        )))
657                    }
658                },
659            };
660            json_values.push(Some(json_bytes));
661        }
662    }
663
664    let binary_array = BinaryArray::from_iter(json_values);
665    Ok(Arc::new(binary_array))
666}
667
668#[cfg(test)]
669mod test {
670    use common_error::ext::BoxedError;
671    use common_error::mock::MockError;
672    use common_error::status_code::StatusCode;
673    use datatypes::arrow::array::{ArrayRef, MapArray, StringArray, StructArray};
674    use datatypes::arrow::buffer::OffsetBuffer;
675    use datatypes::arrow::datatypes::Field;
676    use datatypes::prelude::ConcreteDataType;
677    use datatypes::schema::ColumnSchema;
678    use datatypes::vectors::Int32Vector;
679    use snafu::IntoError;
680
681    use super::*;
682    use crate::error::Error;
683    use crate::RecordBatches;
684
685    #[tokio::test]
686    async fn test_async_recordbatch_stream_adaptor() {
687        struct MaybeErrorRecordBatchStream {
688            items: Vec<Result<RecordBatch>>,
689        }
690
691        impl RecordBatchStream for MaybeErrorRecordBatchStream {
692            fn schema(&self) -> SchemaRef {
693                unimplemented!()
694            }
695
696            fn output_ordering(&self) -> Option<&[OrderOption]> {
697                None
698            }
699
700            fn metrics(&self) -> Option<RecordBatchMetrics> {
701                None
702            }
703        }
704
705        impl Stream for MaybeErrorRecordBatchStream {
706            type Item = Result<RecordBatch>;
707
708            fn poll_next(
709                mut self: Pin<&mut Self>,
710                _: &mut Context<'_>,
711            ) -> Poll<Option<Self::Item>> {
712                if let Some(batch) = self.items.pop() {
713                    Poll::Ready(Some(Ok(batch?)))
714                } else {
715                    Poll::Ready(None)
716                }
717            }
718        }
719
720        fn new_future_stream(
721            maybe_recordbatches: Result<Vec<Result<RecordBatch>>>,
722        ) -> FutureStream {
723            Box::pin(async move {
724                maybe_recordbatches
725                    .map(|items| Box::pin(MaybeErrorRecordBatchStream { items }) as _)
726            })
727        }
728
729        let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
730            "a",
731            ConcreteDataType::int32_datatype(),
732            false,
733        )]));
734        let batch1 = RecordBatch::new(
735            schema.clone(),
736            vec![Arc::new(Int32Vector::from_slice([1])) as _],
737        )
738        .unwrap();
739        let batch2 = RecordBatch::new(
740            schema.clone(),
741            vec![Arc::new(Int32Vector::from_slice([2])) as _],
742        )
743        .unwrap();
744
745        let success_stream = new_future_stream(Ok(vec![Ok(batch1.clone()), Ok(batch2.clone())]));
746        let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), success_stream);
747        let collected = RecordBatches::try_collect(Box::pin(adapter)).await.unwrap();
748        assert_eq!(
749            collected,
750            RecordBatches::try_new(schema.clone(), vec![batch2.clone(), batch1.clone()]).unwrap()
751        );
752
753        let poll_err_stream = new_future_stream(Ok(vec![
754            Ok(batch1.clone()),
755            Err(error::ExternalSnafu
756                .into_error(BoxedError::new(MockError::new(StatusCode::Unknown)))),
757        ]));
758        let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), poll_err_stream);
759        let err = RecordBatches::try_collect(Box::pin(adapter))
760            .await
761            .unwrap_err();
762        assert!(
763            matches!(err, Error::External { .. }),
764            "unexpected err {err}"
765        );
766
767        let failed_to_init_stream =
768            new_future_stream(Err(error::ExternalSnafu
769                .into_error(BoxedError::new(MockError::new(StatusCode::Internal)))));
770        let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), failed_to_init_stream);
771        let err = RecordBatches::try_collect(Box::pin(adapter))
772            .await
773            .unwrap_err();
774        assert!(
775            matches!(err, Error::External { .. }),
776            "unexpected err {err}"
777        );
778    }
779
780    #[test]
781    fn test_convert_map_to_json_binary() {
782        let keys = StringArray::from(vec![Some("a"), Some("b"), Some("c"), Some("x")]);
783        let values = StringArray::from(vec![Some("1"), None, Some("3"), Some("42")]);
784        let key_field = Arc::new(Field::new("key", ArrowDataType::Utf8, false));
785        let value_field = Arc::new(Field::new("value", ArrowDataType::Utf8, true));
786        let struct_type = ArrowDataType::Struct(vec![key_field, value_field].into());
787
788        let entries_field = Arc::new(Field::new("entries", struct_type, false));
789
790        let struct_array = StructArray::from(vec![
791            (
792                Arc::new(Field::new("key", ArrowDataType::Utf8, false)),
793                Arc::new(keys) as ArrayRef,
794            ),
795            (
796                Arc::new(Field::new("value", ArrowDataType::Utf8, true)),
797                Arc::new(values) as ArrayRef,
798            ),
799        ]);
800
801        let offsets = OffsetBuffer::from_lengths([3, 0, 1]);
802        let nulls = datatypes::arrow::buffer::NullBuffer::from(vec![true, false, true]);
803
804        let map_array = MapArray::new(
805            entries_field,
806            offsets,
807            struct_array,
808            Some(nulls), // nulls
809            false,
810        );
811
812        let result = convert_map_to_json_binary(&map_array, None).unwrap();
813        let binary_array = result
814            .as_any()
815            .downcast_ref::<datatypes::arrow::array::BinaryArray>()
816            .unwrap();
817
818        let expected_jsons = [
819            Some(r#"{"a":"1","b":null,"c":"3"}"#),
820            None,
821            Some(r#"{"x":"42"}"#),
822        ];
823
824        for (i, _) in expected_jsons.iter().enumerate() {
825            if let Some(expected) = &expected_jsons[i] {
826                assert!(!binary_array.is_null(i));
827                let actual_bytes = binary_array.value(i);
828                let actual_str = std::str::from_utf8(actual_bytes).unwrap();
829                assert_eq!(actual_str, *expected);
830            } else {
831                assert!(binary_array.is_null(i));
832            }
833        }
834
835        let result_json =
836            convert_map_to_json_binary(&map_array, Some(ColumnExtType::Json)).unwrap();
837        let binary_array_json = result_json
838            .as_any()
839            .downcast_ref::<datatypes::arrow::array::BinaryArray>()
840            .unwrap();
841
842        for (i, _) in expected_jsons.iter().enumerate() {
843            if expected_jsons[i].is_some() {
844                assert!(!binary_array_json.is_null(i));
845                let actual_bytes = binary_array_json.value(i);
846                assert_ne!(actual_bytes, expected_jsons[i].unwrap().as_bytes());
847            } else {
848                assert!(binary_array_json.is_null(i));
849            }
850        }
851    }
852}