1use std::fmt::{self, Display};
16use std::future::Future;
17use std::marker::PhantomData;
18use std::pin::Pin;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22
23use datafusion::arrow::compute::cast;
24use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
25use datafusion::error::Result as DfResult;
26use datafusion::execution::context::ExecutionProps;
27use datafusion::logical_expr::utils::conjunction;
28use datafusion::logical_expr::Expr;
29use datafusion::physical_expr::create_physical_expr;
30use datafusion::physical_plan::metrics::{BaselineMetrics, MetricValue};
31use datafusion::physical_plan::{
32 accept, DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor, PhysicalExpr,
33 RecordBatchStream as DfRecordBatchStream,
34};
35use datafusion_common::arrow::error::ArrowError;
36use datafusion_common::{DataFusionError, ToDFSchema};
37use datatypes::arrow::array::Array;
38use datatypes::arrow::datatypes::DataType as ArrowDataType;
39use datatypes::schema::{ColumnExtType, Schema, SchemaRef};
40use futures::ready;
41use jsonb;
42use pin_project::pin_project;
43use snafu::ResultExt;
44
45use crate::error::{self, Result};
46use crate::filter::batch_filter;
47use crate::{
48 DfRecordBatch, DfSendableRecordBatchStream, OrderOption, RecordBatch, RecordBatchStream,
49 SendableRecordBatchStream, Stream,
50};
51
52type FutureStream =
53 Pin<Box<dyn std::future::Future<Output = Result<SendableRecordBatchStream>> + Send>>;
54
55#[pin_project]
57pub struct RecordBatchStreamTypeAdapter<T, E> {
58 #[pin]
59 stream: T,
60 projected_schema: DfSchemaRef,
61 projection: Vec<usize>,
62 predicate: Option<Arc<dyn PhysicalExpr>>,
63 phantom: PhantomData<E>,
64}
65
66impl<T, E> RecordBatchStreamTypeAdapter<T, E>
67where
68 T: Stream<Item = std::result::Result<DfRecordBatch, E>>,
69 E: std::error::Error + Send + Sync + 'static,
70{
71 pub fn new(projected_schema: DfSchemaRef, stream: T, projection: Option<Vec<usize>>) -> Self {
72 let projection = if let Some(projection) = projection {
73 projection
74 } else {
75 (0..projected_schema.fields().len()).collect()
76 };
77
78 Self {
79 stream,
80 projected_schema,
81 projection,
82 predicate: None,
83 phantom: Default::default(),
84 }
85 }
86
87 pub fn with_filter(mut self, filters: Vec<Expr>) -> Result<Self> {
88 let filters = if let Some(expr) = conjunction(filters) {
89 let df_schema = self
90 .projected_schema
91 .clone()
92 .to_dfschema_ref()
93 .context(error::PhysicalExprSnafu)?;
94
95 let filters = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())
96 .context(error::PhysicalExprSnafu)?;
97 Some(filters)
98 } else {
99 None
100 };
101 self.predicate = filters;
102 Ok(self)
103 }
104}
105
106impl<T, E> DfRecordBatchStream for RecordBatchStreamTypeAdapter<T, E>
107where
108 T: Stream<Item = std::result::Result<DfRecordBatch, E>>,
109 E: std::error::Error + Send + Sync + 'static,
110{
111 fn schema(&self) -> DfSchemaRef {
112 self.projected_schema.clone()
113 }
114}
115
116impl<T, E> Stream for RecordBatchStreamTypeAdapter<T, E>
117where
118 T: Stream<Item = std::result::Result<DfRecordBatch, E>>,
119 E: std::error::Error + Send + Sync + 'static,
120{
121 type Item = DfResult<DfRecordBatch>;
122
123 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
124 let this = self.project();
125
126 let batch = futures::ready!(this.stream.poll_next(cx))
127 .map(|r| r.map_err(|e| DataFusionError::External(Box::new(e))));
128
129 let projected_schema = this.projected_schema.clone();
130 let projection = this.projection.clone();
131 let predicate = this.predicate.clone();
132
133 let batch = batch.map(|b| {
134 b.and_then(|b| {
135 let projected_column = b.project(&projection)?;
136 if projected_column.schema().fields.len() != projected_schema.fields.len() {
137 return Err(DataFusionError::ArrowError(Box::new(ArrowError::SchemaError(format!(
138 "Trying to cast a RecordBatch into an incompatible schema. RecordBatch: {}, Target: {}",
139 projected_column.schema(),
140 projected_schema,
141 ))), None));
142 }
143
144 let mut columns = Vec::with_capacity(projected_schema.fields.len());
145 for (idx,field) in projected_schema.fields.iter().enumerate() {
146 let column = projected_column.column(idx);
147 let extype = field.metadata().get("greptime:type").and_then(|s| ColumnExtType::from_str(s).ok());
148 let output = custom_cast(&column, field.data_type(), extype)?;
149 columns.push(output)
150 }
151 let record_batch = DfRecordBatch::try_new(projected_schema, columns)?;
152 let record_batch = if let Some(predicate) = predicate {
153 batch_filter(&record_batch, &predicate)?
154 } else {
155 record_batch
156 };
157 Ok(record_batch)
158 })
159 });
160
161 Poll::Ready(batch)
162 }
163
164 #[inline]
165 fn size_hint(&self) -> (usize, Option<usize>) {
166 self.stream.size_hint()
167 }
168}
169
170pub struct DfRecordBatchStreamAdapter {
173 stream: SendableRecordBatchStream,
174}
175
176impl DfRecordBatchStreamAdapter {
177 pub fn new(stream: SendableRecordBatchStream) -> Self {
178 Self { stream }
179 }
180}
181
182impl DfRecordBatchStream for DfRecordBatchStreamAdapter {
183 fn schema(&self) -> DfSchemaRef {
184 self.stream.schema().arrow_schema().clone()
185 }
186}
187
188impl Stream for DfRecordBatchStreamAdapter {
189 type Item = DfResult<DfRecordBatch>;
190
191 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
192 match Pin::new(&mut self.stream).poll_next(cx) {
193 Poll::Pending => Poll::Pending,
194 Poll::Ready(Some(recordbatch)) => match recordbatch {
195 Ok(recordbatch) => Poll::Ready(Some(Ok(recordbatch.into_df_record_batch()))),
196 Err(e) => Poll::Ready(Some(Err(DataFusionError::External(Box::new(e))))),
197 },
198 Poll::Ready(None) => Poll::Ready(None),
199 }
200 }
201
202 #[inline]
203 fn size_hint(&self) -> (usize, Option<usize>) {
204 self.stream.size_hint()
205 }
206}
207
208pub struct RecordBatchStreamAdapter {
212 schema: SchemaRef,
213 stream: DfSendableRecordBatchStream,
214 metrics: Option<BaselineMetrics>,
215 metrics_2: Metrics,
217 explain_verbose: bool,
219}
220
221enum Metrics {
223 Unavailable,
224 Unresolved(Arc<dyn ExecutionPlan>),
225 PartialResolved(Arc<dyn ExecutionPlan>, RecordBatchMetrics),
226 Resolved(RecordBatchMetrics),
227}
228
229impl RecordBatchStreamAdapter {
230 pub fn try_new(stream: DfSendableRecordBatchStream) -> Result<Self> {
231 let schema =
232 Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
233 Ok(Self {
234 schema,
235 stream,
236 metrics: None,
237 metrics_2: Metrics::Unavailable,
238 explain_verbose: false,
239 })
240 }
241
242 pub fn try_new_with_metrics_and_df_plan(
243 stream: DfSendableRecordBatchStream,
244 metrics: BaselineMetrics,
245 df_plan: Arc<dyn ExecutionPlan>,
246 ) -> Result<Self> {
247 let schema =
248 Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
249 Ok(Self {
250 schema,
251 stream,
252 metrics: Some(metrics),
253 metrics_2: Metrics::Unresolved(df_plan),
254 explain_verbose: false,
255 })
256 }
257
258 pub fn set_metrics2(&mut self, plan: Arc<dyn ExecutionPlan>) {
259 self.metrics_2 = Metrics::Unresolved(plan)
260 }
261
262 pub fn set_explain_verbose(&mut self, verbose: bool) {
264 self.explain_verbose = verbose;
265 }
266}
267
268impl RecordBatchStream for RecordBatchStreamAdapter {
269 fn name(&self) -> &str {
270 "RecordBatchStreamAdapter"
271 }
272
273 fn schema(&self) -> SchemaRef {
274 self.schema.clone()
275 }
276
277 fn metrics(&self) -> Option<RecordBatchMetrics> {
278 match &self.metrics_2 {
279 Metrics::Resolved(metrics) | Metrics::PartialResolved(_, metrics) => {
280 Some(metrics.clone())
281 }
282 Metrics::Unavailable | Metrics::Unresolved(_) => None,
283 }
284 }
285
286 fn output_ordering(&self) -> Option<&[OrderOption]> {
287 None
288 }
289}
290
291impl Stream for RecordBatchStreamAdapter {
292 type Item = Result<RecordBatch>;
293
294 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
295 let timer = self
296 .metrics
297 .as_ref()
298 .map(|m| m.elapsed_compute().clone())
299 .unwrap_or_default();
300 let _guard = timer.timer();
301 match Pin::new(&mut self.stream).poll_next(cx) {
302 Poll::Pending => Poll::Pending,
303 Poll::Ready(Some(df_record_batch)) => {
304 let df_record_batch = df_record_batch?;
305 if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) =
306 &self.metrics_2
307 {
308 let mut metric_collector = MetricCollector::new(self.explain_verbose);
309 accept(df_plan.as_ref(), &mut metric_collector).unwrap();
310 self.metrics_2 = Metrics::PartialResolved(
311 df_plan.clone(),
312 metric_collector.record_batch_metrics,
313 );
314 }
315 Poll::Ready(Some(RecordBatch::try_from_df_record_batch(
316 self.schema(),
317 df_record_batch,
318 )))
319 }
320 Poll::Ready(None) => {
321 if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) =
322 &self.metrics_2
323 {
324 let mut metric_collector = MetricCollector::new(self.explain_verbose);
325 accept(df_plan.as_ref(), &mut metric_collector).unwrap();
326 self.metrics_2 = Metrics::Resolved(metric_collector.record_batch_metrics);
327 }
328 Poll::Ready(None)
329 }
330 }
331 }
332
333 #[inline]
334 fn size_hint(&self) -> (usize, Option<usize>) {
335 self.stream.size_hint()
336 }
337}
338
339pub struct MetricCollector {
341 current_level: usize,
342 pub record_batch_metrics: RecordBatchMetrics,
343 verbose: bool,
344}
345
346impl MetricCollector {
347 pub fn new(verbose: bool) -> Self {
348 Self {
349 current_level: 0,
350 record_batch_metrics: RecordBatchMetrics::default(),
351 verbose,
352 }
353 }
354}
355
356impl ExecutionPlanVisitor for MetricCollector {
357 type Error = !;
358
359 fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> std::result::Result<bool, Self::Error> {
360 let Some(metric) = plan.metrics() else {
362 self.record_batch_metrics.plan_metrics.push(PlanMetrics {
363 plan: plan.name().to_string(),
364 level: self.current_level,
365 metrics: vec![],
366 });
367 self.current_level += 1;
368 return Ok(true);
369 };
370
371 let metric = metric
373 .aggregate_by_name()
374 .sorted_for_display()
375 .timestamps_removed();
376 let mut plan_metric = PlanMetrics {
377 plan: one_line(plan, self.verbose).to_string(),
378 level: self.current_level,
379 metrics: Vec::with_capacity(metric.iter().size_hint().0),
380 };
381 for m in metric.iter() {
382 plan_metric
383 .metrics
384 .push((m.value().name().to_string(), m.value().as_usize()));
385
386 match m.value() {
388 MetricValue::ElapsedCompute(ec) => {
389 self.record_batch_metrics.elapsed_compute += ec.value()
390 }
391 MetricValue::CurrentMemoryUsage(m) => {
392 self.record_batch_metrics.memory_usage += m.value()
393 }
394 _ => {}
395 }
396 }
397 self.record_batch_metrics.plan_metrics.push(plan_metric);
398
399 self.current_level += 1;
400 Ok(true)
401 }
402
403 fn post_visit(&mut self, _plan: &dyn ExecutionPlan) -> std::result::Result<bool, Self::Error> {
404 self.current_level -= 1;
405 Ok(true)
406 }
407}
408
409fn one_line(plan: &dyn ExecutionPlan, verbose: bool) -> impl fmt::Display + '_ {
412 struct Wrapper<'a> {
413 plan: &'a dyn ExecutionPlan,
414 format_type: DisplayFormatType,
415 }
416
417 impl fmt::Display for Wrapper<'_> {
418 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
419 self.plan.fmt_as(self.format_type, f)?;
420 writeln!(f)
421 }
422 }
423
424 let format_type = if verbose {
425 DisplayFormatType::Verbose
426 } else {
427 DisplayFormatType::Default
428 };
429 Wrapper { plan, format_type }
430}
431
432#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone)]
435pub struct RecordBatchMetrics {
436 pub elapsed_compute: usize,
439 pub memory_usage: usize,
441 pub plan_metrics: Vec<PlanMetrics>,
444}
445
446impl Display for RecordBatchMetrics {
448 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
449 for metric in &self.plan_metrics {
450 write!(
451 f,
452 "{:indent$}{} metrics=[",
453 " ",
454 metric.plan.trim_end(),
455 indent = metric.level * 2,
456 )?;
457 for (label, value) in &metric.metrics {
458 write!(f, "{}: {}, ", label, value)?;
459 }
460 writeln!(f, "]")?;
461 }
462
463 Ok(())
464 }
465}
466
467#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone)]
468pub struct PlanMetrics {
469 pub plan: String,
471 pub level: usize,
473 pub metrics: Vec<(String, usize)>,
476}
477
478enum AsyncRecordBatchStreamAdapterState {
479 Uninit(FutureStream),
480 Ready(SendableRecordBatchStream),
481 Failed,
482}
483
484pub struct AsyncRecordBatchStreamAdapter {
485 schema: SchemaRef,
486 state: AsyncRecordBatchStreamAdapterState,
487}
488
489impl AsyncRecordBatchStreamAdapter {
490 pub fn new(schema: SchemaRef, stream: FutureStream) -> Self {
491 Self {
492 schema,
493 state: AsyncRecordBatchStreamAdapterState::Uninit(stream),
494 }
495 }
496}
497
498impl RecordBatchStream for AsyncRecordBatchStreamAdapter {
499 fn schema(&self) -> SchemaRef {
500 self.schema.clone()
501 }
502
503 fn output_ordering(&self) -> Option<&[OrderOption]> {
504 None
505 }
506
507 fn metrics(&self) -> Option<RecordBatchMetrics> {
508 None
509 }
510}
511
512impl Stream for AsyncRecordBatchStreamAdapter {
513 type Item = Result<RecordBatch>;
514
515 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
516 loop {
517 match &mut self.state {
518 AsyncRecordBatchStreamAdapterState::Uninit(stream_future) => {
519 match ready!(Pin::new(stream_future).poll(cx)) {
520 Ok(stream) => {
521 self.state = AsyncRecordBatchStreamAdapterState::Ready(stream);
522 continue;
523 }
524 Err(e) => {
525 self.state = AsyncRecordBatchStreamAdapterState::Failed;
526 return Poll::Ready(Some(Err(e)));
527 }
528 };
529 }
530 AsyncRecordBatchStreamAdapterState::Ready(stream) => {
531 return Poll::Ready(ready!(Pin::new(stream).poll_next(cx)))
532 }
533 AsyncRecordBatchStreamAdapterState::Failed => return Poll::Ready(None),
534 }
535 }
536 }
537
538 #[inline]
540 fn size_hint(&self) -> (usize, Option<usize>) {
541 (0, None)
542 }
543}
544
545fn custom_cast(
547 array: &dyn Array,
548 target_type: &ArrowDataType,
549 extype: Option<ColumnExtType>,
550) -> std::result::Result<Arc<dyn Array>, ArrowError> {
551 if let ArrowDataType::Map(_, _) = array.data_type() {
552 if let ArrowDataType::Binary = target_type {
553 return convert_map_to_json_binary(array, extype);
554 }
555 }
556
557 cast(array, target_type)
558}
559
560fn convert_map_to_json_binary(
562 array: &dyn Array,
563 extype: Option<ColumnExtType>,
564) -> std::result::Result<Arc<dyn Array>, ArrowError> {
565 use datatypes::arrow::array::{BinaryArray, MapArray};
566 use serde_json::Value;
567
568 let map_array = array
569 .as_any()
570 .downcast_ref::<MapArray>()
571 .ok_or_else(|| ArrowError::CastError("Failed to downcast to MapArray".to_string()))?;
572
573 let mut json_values = Vec::with_capacity(map_array.len());
574
575 for i in 0..map_array.len() {
576 if map_array.is_null(i) {
577 json_values.push(None);
578 } else {
579 let map_entry = map_array.value(i);
581 let key_value_array = map_entry
582 .as_any()
583 .downcast_ref::<datatypes::arrow::array::StructArray>()
584 .ok_or_else(|| {
585 ArrowError::CastError("Failed to downcast to StructArray".to_string())
586 })?;
587
588 let mut json_obj = serde_json::Map::with_capacity(key_value_array.len());
590
591 for j in 0..key_value_array.len() {
592 if key_value_array.is_null(j) {
593 continue;
594 }
595 let key_field = key_value_array.column(0);
596 let value_field = key_value_array.column(1);
597
598 if key_field.is_null(j) {
599 continue;
600 }
601
602 let key = key_field
603 .as_any()
604 .downcast_ref::<datatypes::arrow::array::StringArray>()
605 .ok_or_else(|| {
606 ArrowError::CastError("Failed to downcast key to StringArray".to_string())
607 })?
608 .value(j);
609
610 let value = if value_field.is_null(j) {
611 Value::Null
612 } else {
613 let value_str = value_field
614 .as_any()
615 .downcast_ref::<datatypes::arrow::array::StringArray>()
616 .ok_or_else(|| {
617 ArrowError::CastError(
618 "Failed to downcast value to StringArray".to_string(),
619 )
620 })?
621 .value(j);
622 Value::String(value_str.to_string())
623 };
624
625 json_obj.insert(key.to_string(), value);
626 }
627
628 let json_value = Value::Object(json_obj);
629 let json_bytes = match extype {
630 Some(ColumnExtType::Json) => {
631 let json_string = match serde_json::to_string(&json_value) {
632 Ok(s) => s,
633 Err(e) => {
634 return Err(ArrowError::CastError(format!(
635 "Failed to serialize JSON: {}",
636 e
637 )))
638 }
639 };
640 match jsonb::parse_value(json_string.as_bytes()) {
641 Ok(jsonb_value) => jsonb_value.to_vec(),
642 Err(e) => {
643 return Err(ArrowError::CastError(format!(
644 "Failed to serialize JSONB: {}",
645 e
646 )))
647 }
648 }
649 }
650 _ => match serde_json::to_vec(&json_value) {
651 Ok(b) => b,
652 Err(e) => {
653 return Err(ArrowError::CastError(format!(
654 "Failed to serialize JSON: {}",
655 e
656 )))
657 }
658 },
659 };
660 json_values.push(Some(json_bytes));
661 }
662 }
663
664 let binary_array = BinaryArray::from_iter(json_values);
665 Ok(Arc::new(binary_array))
666}
667
668#[cfg(test)]
669mod test {
670 use common_error::ext::BoxedError;
671 use common_error::mock::MockError;
672 use common_error::status_code::StatusCode;
673 use datatypes::arrow::array::{ArrayRef, MapArray, StringArray, StructArray};
674 use datatypes::arrow::buffer::OffsetBuffer;
675 use datatypes::arrow::datatypes::Field;
676 use datatypes::prelude::ConcreteDataType;
677 use datatypes::schema::ColumnSchema;
678 use datatypes::vectors::Int32Vector;
679 use snafu::IntoError;
680
681 use super::*;
682 use crate::error::Error;
683 use crate::RecordBatches;
684
685 #[tokio::test]
686 async fn test_async_recordbatch_stream_adaptor() {
687 struct MaybeErrorRecordBatchStream {
688 items: Vec<Result<RecordBatch>>,
689 }
690
691 impl RecordBatchStream for MaybeErrorRecordBatchStream {
692 fn schema(&self) -> SchemaRef {
693 unimplemented!()
694 }
695
696 fn output_ordering(&self) -> Option<&[OrderOption]> {
697 None
698 }
699
700 fn metrics(&self) -> Option<RecordBatchMetrics> {
701 None
702 }
703 }
704
705 impl Stream for MaybeErrorRecordBatchStream {
706 type Item = Result<RecordBatch>;
707
708 fn poll_next(
709 mut self: Pin<&mut Self>,
710 _: &mut Context<'_>,
711 ) -> Poll<Option<Self::Item>> {
712 if let Some(batch) = self.items.pop() {
713 Poll::Ready(Some(Ok(batch?)))
714 } else {
715 Poll::Ready(None)
716 }
717 }
718 }
719
720 fn new_future_stream(
721 maybe_recordbatches: Result<Vec<Result<RecordBatch>>>,
722 ) -> FutureStream {
723 Box::pin(async move {
724 maybe_recordbatches
725 .map(|items| Box::pin(MaybeErrorRecordBatchStream { items }) as _)
726 })
727 }
728
729 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
730 "a",
731 ConcreteDataType::int32_datatype(),
732 false,
733 )]));
734 let batch1 = RecordBatch::new(
735 schema.clone(),
736 vec![Arc::new(Int32Vector::from_slice([1])) as _],
737 )
738 .unwrap();
739 let batch2 = RecordBatch::new(
740 schema.clone(),
741 vec![Arc::new(Int32Vector::from_slice([2])) as _],
742 )
743 .unwrap();
744
745 let success_stream = new_future_stream(Ok(vec![Ok(batch1.clone()), Ok(batch2.clone())]));
746 let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), success_stream);
747 let collected = RecordBatches::try_collect(Box::pin(adapter)).await.unwrap();
748 assert_eq!(
749 collected,
750 RecordBatches::try_new(schema.clone(), vec![batch2.clone(), batch1.clone()]).unwrap()
751 );
752
753 let poll_err_stream = new_future_stream(Ok(vec![
754 Ok(batch1.clone()),
755 Err(error::ExternalSnafu
756 .into_error(BoxedError::new(MockError::new(StatusCode::Unknown)))),
757 ]));
758 let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), poll_err_stream);
759 let err = RecordBatches::try_collect(Box::pin(adapter))
760 .await
761 .unwrap_err();
762 assert!(
763 matches!(err, Error::External { .. }),
764 "unexpected err {err}"
765 );
766
767 let failed_to_init_stream =
768 new_future_stream(Err(error::ExternalSnafu
769 .into_error(BoxedError::new(MockError::new(StatusCode::Internal)))));
770 let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), failed_to_init_stream);
771 let err = RecordBatches::try_collect(Box::pin(adapter))
772 .await
773 .unwrap_err();
774 assert!(
775 matches!(err, Error::External { .. }),
776 "unexpected err {err}"
777 );
778 }
779
780 #[test]
781 fn test_convert_map_to_json_binary() {
782 let keys = StringArray::from(vec![Some("a"), Some("b"), Some("c"), Some("x")]);
783 let values = StringArray::from(vec![Some("1"), None, Some("3"), Some("42")]);
784 let key_field = Arc::new(Field::new("key", ArrowDataType::Utf8, false));
785 let value_field = Arc::new(Field::new("value", ArrowDataType::Utf8, true));
786 let struct_type = ArrowDataType::Struct(vec![key_field, value_field].into());
787
788 let entries_field = Arc::new(Field::new("entries", struct_type, false));
789
790 let struct_array = StructArray::from(vec![
791 (
792 Arc::new(Field::new("key", ArrowDataType::Utf8, false)),
793 Arc::new(keys) as ArrayRef,
794 ),
795 (
796 Arc::new(Field::new("value", ArrowDataType::Utf8, true)),
797 Arc::new(values) as ArrayRef,
798 ),
799 ]);
800
801 let offsets = OffsetBuffer::from_lengths([3, 0, 1]);
802 let nulls = datatypes::arrow::buffer::NullBuffer::from(vec![true, false, true]);
803
804 let map_array = MapArray::new(
805 entries_field,
806 offsets,
807 struct_array,
808 Some(nulls), false,
810 );
811
812 let result = convert_map_to_json_binary(&map_array, None).unwrap();
813 let binary_array = result
814 .as_any()
815 .downcast_ref::<datatypes::arrow::array::BinaryArray>()
816 .unwrap();
817
818 let expected_jsons = [
819 Some(r#"{"a":"1","b":null,"c":"3"}"#),
820 None,
821 Some(r#"{"x":"42"}"#),
822 ];
823
824 for (i, _) in expected_jsons.iter().enumerate() {
825 if let Some(expected) = &expected_jsons[i] {
826 assert!(!binary_array.is_null(i));
827 let actual_bytes = binary_array.value(i);
828 let actual_str = std::str::from_utf8(actual_bytes).unwrap();
829 assert_eq!(actual_str, *expected);
830 } else {
831 assert!(binary_array.is_null(i));
832 }
833 }
834
835 let result_json =
836 convert_map_to_json_binary(&map_array, Some(ColumnExtType::Json)).unwrap();
837 let binary_array_json = result_json
838 .as_any()
839 .downcast_ref::<datatypes::arrow::array::BinaryArray>()
840 .unwrap();
841
842 for (i, _) in expected_jsons.iter().enumerate() {
843 if expected_jsons[i].is_some() {
844 assert!(!binary_array_json.is_null(i));
845 let actual_bytes = binary_array_json.value(i);
846 assert_ne!(actual_bytes, expected_jsons[i].unwrap().as_bytes());
847 } else {
848 assert!(binary_array_json.is_null(i));
849 }
850 }
851 }
852}