1use std::fmt::{self, Display};
16use std::future::Future;
17use std::marker::PhantomData;
18use std::pin::Pin;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22
23use common_base::readable_size::ReadableSize;
24use common_telemetry::tracing::{Span, info_span};
25use common_time::util::format_nanoseconds_human_readable;
26use datafusion::arrow::compute::cast;
27use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
28use datafusion::error::Result as DfResult;
29use datafusion::execution::context::ExecutionProps;
30use datafusion::logical_expr::Expr;
31use datafusion::logical_expr::utils::conjunction;
32use datafusion::physical_expr::create_physical_expr;
33use datafusion::physical_plan::metrics::{BaselineMetrics, MetricValue};
34use datafusion::physical_plan::{
35 DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor, PhysicalExpr,
36 RecordBatchStream as DfRecordBatchStream, accept,
37};
38use datafusion_common::arrow::error::ArrowError;
39use datafusion_common::{DataFusionError, ToDFSchema};
40use datatypes::arrow::array::Array;
41use datatypes::arrow::datatypes::DataType as ArrowDataType;
42use datatypes::schema::{ColumnExtType, Schema, SchemaRef};
43use futures::ready;
44use jsonb;
45use pin_project::pin_project;
46use snafu::ResultExt;
47
48use crate::error::{self, Result};
49use crate::filter::batch_filter;
50use crate::{
51 DfRecordBatch, DfSendableRecordBatchStream, OrderOption, RecordBatch, RecordBatchStream,
52 SendableRecordBatchStream, Stream,
53};
54
55type FutureStream =
56 Pin<Box<dyn std::future::Future<Output = Result<SendableRecordBatchStream>> + Send>>;
57
58#[pin_project]
60pub struct RecordBatchStreamTypeAdapter<T, E> {
61 #[pin]
62 stream: T,
63 projected_schema: DfSchemaRef,
64 projection: Vec<usize>,
65 predicate: Option<Arc<dyn PhysicalExpr>>,
66 phantom: PhantomData<E>,
67}
68
69impl<T, E> RecordBatchStreamTypeAdapter<T, E>
70where
71 T: Stream<Item = std::result::Result<DfRecordBatch, E>>,
72 E: std::error::Error + Send + Sync + 'static,
73{
74 pub fn new(projected_schema: DfSchemaRef, stream: T, projection: Option<Vec<usize>>) -> Self {
75 let projection = if let Some(projection) = projection {
76 projection
77 } else {
78 (0..projected_schema.fields().len()).collect()
79 };
80
81 Self {
82 stream,
83 projected_schema,
84 projection,
85 predicate: None,
86 phantom: Default::default(),
87 }
88 }
89
90 pub fn with_filter(mut self, filters: Vec<Expr>) -> Result<Self> {
91 let filters = if let Some(expr) = conjunction(filters) {
92 let df_schema = self
93 .projected_schema
94 .clone()
95 .to_dfschema_ref()
96 .context(error::PhysicalExprSnafu)?;
97
98 let filters = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())
99 .context(error::PhysicalExprSnafu)?;
100 Some(filters)
101 } else {
102 None
103 };
104 self.predicate = filters;
105 Ok(self)
106 }
107}
108
109impl<T, E> DfRecordBatchStream for RecordBatchStreamTypeAdapter<T, E>
110where
111 T: Stream<Item = std::result::Result<DfRecordBatch, E>>,
112 E: std::error::Error + Send + Sync + 'static,
113{
114 fn schema(&self) -> DfSchemaRef {
115 self.projected_schema.clone()
116 }
117}
118
119impl<T, E> Stream for RecordBatchStreamTypeAdapter<T, E>
120where
121 T: Stream<Item = std::result::Result<DfRecordBatch, E>>,
122 E: std::error::Error + Send + Sync + 'static,
123{
124 type Item = DfResult<DfRecordBatch>;
125
126 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
127 let this = self.project();
128
129 let batch = futures::ready!(this.stream.poll_next(cx))
130 .map(|r| r.map_err(|e| DataFusionError::External(Box::new(e))));
131
132 let projected_schema = this.projected_schema.clone();
133 let projection = this.projection.clone();
134 let predicate = this.predicate.clone();
135
136 let batch = batch.map(|b| {
137 b.and_then(|b| {
138 let projected_column = b.project(&projection)?;
139 if projected_column.schema().fields.len() != projected_schema.fields.len() {
140 return Err(DataFusionError::ArrowError(Box::new(ArrowError::SchemaError(format!(
141 "Trying to cast a RecordBatch into an incompatible schema. RecordBatch: {}, Target: {}",
142 projected_column.schema(),
143 projected_schema,
144 ))), None));
145 }
146
147 let mut columns = Vec::with_capacity(projected_schema.fields.len());
148 for (idx,field) in projected_schema.fields.iter().enumerate() {
149 let column = projected_column.column(idx);
150 let extype = field.metadata().get("greptime:type").and_then(|s| ColumnExtType::from_str(s).ok());
151 let output = custom_cast(&column, field.data_type(), extype)?;
152 columns.push(output)
153 }
154 let record_batch = DfRecordBatch::try_new(projected_schema, columns)?;
155 let record_batch = if let Some(predicate) = predicate {
156 batch_filter(&record_batch, &predicate)?
157 } else {
158 record_batch
159 };
160 Ok(record_batch)
161 })
162 });
163
164 Poll::Ready(batch)
165 }
166
167 #[inline]
168 fn size_hint(&self) -> (usize, Option<usize>) {
169 self.stream.size_hint()
170 }
171}
172
173pub struct DfRecordBatchStreamAdapter {
176 stream: SendableRecordBatchStream,
177}
178
179impl DfRecordBatchStreamAdapter {
180 pub fn new(stream: SendableRecordBatchStream) -> Self {
181 Self { stream }
182 }
183}
184
185impl DfRecordBatchStream for DfRecordBatchStreamAdapter {
186 fn schema(&self) -> DfSchemaRef {
187 self.stream.schema().arrow_schema().clone()
188 }
189}
190
191impl Stream for DfRecordBatchStreamAdapter {
192 type Item = DfResult<DfRecordBatch>;
193
194 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
195 match Pin::new(&mut self.stream).poll_next(cx) {
196 Poll::Pending => Poll::Pending,
197 Poll::Ready(Some(recordbatch)) => match recordbatch {
198 Ok(recordbatch) => Poll::Ready(Some(Ok(recordbatch.into_df_record_batch()))),
199 Err(e) => Poll::Ready(Some(Err(DataFusionError::External(Box::new(e))))),
200 },
201 Poll::Ready(None) => Poll::Ready(None),
202 }
203 }
204
205 #[inline]
206 fn size_hint(&self) -> (usize, Option<usize>) {
207 self.stream.size_hint()
208 }
209}
210
211pub struct RecordBatchStreamAdapter {
215 schema: SchemaRef,
216 stream: DfSendableRecordBatchStream,
217 metrics: Option<BaselineMetrics>,
218 metrics_2: Metrics,
220 explain_verbose: bool,
222 span: Span,
223}
224
225enum Metrics {
227 Unavailable,
228 Unresolved(Arc<dyn ExecutionPlan>),
229 PartialResolved(Arc<dyn ExecutionPlan>, RecordBatchMetrics),
230 Resolved(RecordBatchMetrics),
231}
232
233impl RecordBatchStreamAdapter {
234 pub fn try_new(stream: DfSendableRecordBatchStream) -> Result<Self> {
235 let schema =
236 Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
237 Ok(Self {
238 schema,
239 stream,
240 metrics: None,
241 metrics_2: Metrics::Unavailable,
242 explain_verbose: false,
243 span: Span::current(),
244 })
245 }
246
247 pub fn try_new_with_span(stream: DfSendableRecordBatchStream, span: Span) -> Result<Self> {
248 let schema =
249 Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
250 let subspan = info_span!(parent: &span, "RecordBatchStreamAdapter");
251 Ok(Self {
252 schema,
253 stream,
254 metrics: None,
255 metrics_2: Metrics::Unavailable,
256 explain_verbose: false,
257 span: subspan,
258 })
259 }
260
261 pub fn set_metrics2(&mut self, plan: Arc<dyn ExecutionPlan>) {
262 self.metrics_2 = Metrics::Unresolved(plan)
263 }
264
265 pub fn set_explain_verbose(&mut self, verbose: bool) {
267 self.explain_verbose = verbose;
268 }
269}
270
271impl RecordBatchStream for RecordBatchStreamAdapter {
272 fn name(&self) -> &str {
273 "RecordBatchStreamAdapter"
274 }
275
276 fn schema(&self) -> SchemaRef {
277 self.schema.clone()
278 }
279
280 fn metrics(&self) -> Option<RecordBatchMetrics> {
281 match &self.metrics_2 {
282 Metrics::Resolved(metrics) | Metrics::PartialResolved(_, metrics) => {
283 Some(metrics.clone())
284 }
285 Metrics::Unavailable | Metrics::Unresolved(_) => None,
286 }
287 }
288
289 fn output_ordering(&self) -> Option<&[OrderOption]> {
290 None
291 }
292}
293
294impl Stream for RecordBatchStreamAdapter {
295 type Item = Result<RecordBatch>;
296
297 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
298 let timer = self
299 .metrics
300 .as_ref()
301 .map(|m| m.elapsed_compute().clone())
302 .unwrap_or_default();
303 let _guard = timer.timer();
304 let poll_span = info_span!(parent: &self.span, "poll_next");
305 let _entered = poll_span.enter();
306 match Pin::new(&mut self.stream).poll_next(cx) {
307 Poll::Pending => Poll::Pending,
308 Poll::Ready(Some(df_record_batch)) => {
309 let df_record_batch = df_record_batch?;
310 if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) =
311 &self.metrics_2
312 {
313 let mut metric_collector = MetricCollector::new(self.explain_verbose);
314 accept(df_plan.as_ref(), &mut metric_collector).unwrap();
315 self.metrics_2 = Metrics::PartialResolved(
316 df_plan.clone(),
317 metric_collector.record_batch_metrics,
318 );
319 }
320 Poll::Ready(Some(Ok(RecordBatch::from_df_record_batch(
321 self.schema(),
322 df_record_batch,
323 ))))
324 }
325 Poll::Ready(None) => {
326 if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) =
327 &self.metrics_2
328 {
329 let mut metric_collector = MetricCollector::new(self.explain_verbose);
330 accept(df_plan.as_ref(), &mut metric_collector).unwrap();
331 self.metrics_2 = Metrics::Resolved(metric_collector.record_batch_metrics);
332 }
333 Poll::Ready(None)
334 }
335 }
336 }
337
338 #[inline]
339 fn size_hint(&self) -> (usize, Option<usize>) {
340 self.stream.size_hint()
341 }
342}
343
344pub struct MetricCollector {
346 current_level: usize,
347 pub record_batch_metrics: RecordBatchMetrics,
348 verbose: bool,
349}
350
351impl MetricCollector {
352 pub fn new(verbose: bool) -> Self {
353 Self {
354 current_level: 0,
355 record_batch_metrics: RecordBatchMetrics::default(),
356 verbose,
357 }
358 }
359}
360
361impl ExecutionPlanVisitor for MetricCollector {
362 type Error = !;
363
364 fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> std::result::Result<bool, Self::Error> {
365 let Some(metric) = plan.metrics() else {
367 self.record_batch_metrics.plan_metrics.push(PlanMetrics {
368 plan: plan.name().to_string(),
369 level: self.current_level,
370 metrics: vec![],
371 });
372 self.current_level += 1;
373 return Ok(true);
374 };
375
376 let metric = metric
378 .aggregate_by_name()
379 .sorted_for_display()
380 .timestamps_removed();
381 let mut plan_metric = PlanMetrics {
382 plan: one_line(plan, self.verbose).to_string(),
383 level: self.current_level,
384 metrics: Vec::with_capacity(metric.iter().size_hint().0),
385 };
386 for m in metric.iter() {
387 plan_metric
388 .metrics
389 .push((m.value().name().to_string(), m.value().as_usize()));
390
391 match m.value() {
393 MetricValue::ElapsedCompute(ec) => {
394 self.record_batch_metrics.elapsed_compute += ec.value()
395 }
396 MetricValue::CurrentMemoryUsage(m) => {
397 self.record_batch_metrics.memory_usage += m.value()
398 }
399 _ => {}
400 }
401 }
402 self.record_batch_metrics.plan_metrics.push(plan_metric);
403
404 self.current_level += 1;
405 Ok(true)
406 }
407
408 fn post_visit(&mut self, _plan: &dyn ExecutionPlan) -> std::result::Result<bool, Self::Error> {
409 self.current_level -= 1;
410 Ok(true)
411 }
412}
413
414fn one_line(plan: &dyn ExecutionPlan, verbose: bool) -> impl fmt::Display + '_ {
417 struct Wrapper<'a> {
418 plan: &'a dyn ExecutionPlan,
419 format_type: DisplayFormatType,
420 }
421
422 impl fmt::Display for Wrapper<'_> {
423 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
424 self.plan.fmt_as(self.format_type, f)?;
425 writeln!(f)
426 }
427 }
428
429 let format_type = if verbose {
430 DisplayFormatType::Verbose
431 } else {
432 DisplayFormatType::Default
433 };
434 Wrapper { plan, format_type }
435}
436
437#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone)]
440pub struct RecordBatchMetrics {
441 pub elapsed_compute: usize,
444 pub memory_usage: usize,
446 pub plan_metrics: Vec<PlanMetrics>,
449 #[serde(default, skip_serializing_if = "Vec::is_empty")]
462 pub region_watermarks: Vec<RegionWatermarkEntry>,
463}
464
465#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
466pub struct RegionWatermarkEntry {
467 pub region_id: u64,
468 #[serde(default, skip_serializing_if = "Option::is_none")]
469 pub watermark: Option<u64>,
470}
471
472fn is_time_metric(metric_name: &str) -> bool {
474 metric_name.contains("elapsed") || metric_name.contains("time") || metric_name.contains("cost")
475}
476
477fn is_bytes_metric(metric_name: &str) -> bool {
479 metric_name.contains("bytes") || metric_name.contains("mem")
480}
481
482fn format_bytes_human_readable(bytes: usize) -> String {
483 format!("{}", ReadableSize(bytes as u64))
484}
485
486impl Display for RecordBatchMetrics {
488 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
489 for metric in &self.plan_metrics {
490 write!(
491 f,
492 "{:indent$}{} metrics=[",
493 " ",
494 metric.plan.trim_end(),
495 indent = metric.level * 2,
496 )?;
497 for (label, value) in &metric.metrics {
498 if is_time_metric(label) {
499 write!(
500 f,
501 "{}: {}, ",
502 label,
503 format_nanoseconds_human_readable(*value),
504 )?;
505 } else if is_bytes_metric(label) {
506 write!(f, "{}: {}, ", label, format_bytes_human_readable(*value),)?;
507 } else {
508 write!(f, "{}: {}, ", label, value)?;
509 }
510 }
511 writeln!(f, "]")?;
512 }
513
514 Ok(())
515 }
516}
517
518#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone)]
519pub struct PlanMetrics {
520 pub plan: String,
522 pub level: usize,
524 pub metrics: Vec<(String, usize)>,
527}
528
529enum AsyncRecordBatchStreamAdapterState {
530 Uninit(FutureStream),
531 Ready(SendableRecordBatchStream),
532 Failed,
533}
534
535pub struct AsyncRecordBatchStreamAdapter {
536 schema: SchemaRef,
537 state: AsyncRecordBatchStreamAdapterState,
538}
539
540impl AsyncRecordBatchStreamAdapter {
541 pub fn new(schema: SchemaRef, stream: FutureStream) -> Self {
542 Self {
543 schema,
544 state: AsyncRecordBatchStreamAdapterState::Uninit(stream),
545 }
546 }
547}
548
549impl RecordBatchStream for AsyncRecordBatchStreamAdapter {
550 fn schema(&self) -> SchemaRef {
551 self.schema.clone()
552 }
553
554 fn output_ordering(&self) -> Option<&[OrderOption]> {
555 None
556 }
557
558 fn metrics(&self) -> Option<RecordBatchMetrics> {
559 None
560 }
561}
562
563impl Stream for AsyncRecordBatchStreamAdapter {
564 type Item = Result<RecordBatch>;
565
566 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
567 loop {
568 match &mut self.state {
569 AsyncRecordBatchStreamAdapterState::Uninit(stream_future) => {
570 match ready!(Pin::new(stream_future).poll(cx)) {
571 Ok(stream) => {
572 self.state = AsyncRecordBatchStreamAdapterState::Ready(stream);
573 continue;
574 }
575 Err(e) => {
576 self.state = AsyncRecordBatchStreamAdapterState::Failed;
577 return Poll::Ready(Some(Err(e)));
578 }
579 };
580 }
581 AsyncRecordBatchStreamAdapterState::Ready(stream) => {
582 return Poll::Ready(ready!(Pin::new(stream).poll_next(cx)));
583 }
584 AsyncRecordBatchStreamAdapterState::Failed => return Poll::Ready(None),
585 }
586 }
587 }
588
589 #[inline]
591 fn size_hint(&self) -> (usize, Option<usize>) {
592 (0, None)
593 }
594}
595
596fn custom_cast(
598 array: &dyn Array,
599 target_type: &ArrowDataType,
600 extype: Option<ColumnExtType>,
601) -> std::result::Result<Arc<dyn Array>, ArrowError> {
602 if let ArrowDataType::Map(_, _) = array.data_type()
603 && let ArrowDataType::Binary = target_type
604 {
605 return convert_map_to_json_binary(array, extype);
606 }
607
608 cast(array, target_type)
609}
610
611fn convert_map_to_json_binary(
613 array: &dyn Array,
614 extype: Option<ColumnExtType>,
615) -> std::result::Result<Arc<dyn Array>, ArrowError> {
616 use datatypes::arrow::array::{BinaryArray, MapArray};
617 use serde_json::Value;
618
619 let map_array = array
620 .as_any()
621 .downcast_ref::<MapArray>()
622 .ok_or_else(|| ArrowError::CastError("Failed to downcast to MapArray".to_string()))?;
623
624 let mut json_values = Vec::with_capacity(map_array.len());
625
626 for i in 0..map_array.len() {
627 if map_array.is_null(i) {
628 json_values.push(None);
629 } else {
630 let map_entry = map_array.value(i);
632 let key_value_array = map_entry
633 .as_any()
634 .downcast_ref::<datatypes::arrow::array::StructArray>()
635 .ok_or_else(|| {
636 ArrowError::CastError("Failed to downcast to StructArray".to_string())
637 })?;
638
639 let mut json_obj = serde_json::Map::with_capacity(key_value_array.len());
641
642 for j in 0..key_value_array.len() {
643 if key_value_array.is_null(j) {
644 continue;
645 }
646 let key_field = key_value_array.column(0);
647 let value_field = key_value_array.column(1);
648
649 if key_field.is_null(j) {
650 continue;
651 }
652
653 let key = key_field
654 .as_any()
655 .downcast_ref::<datatypes::arrow::array::StringArray>()
656 .ok_or_else(|| {
657 ArrowError::CastError("Failed to downcast key to StringArray".to_string())
658 })?
659 .value(j);
660
661 let value = if value_field.is_null(j) {
662 Value::Null
663 } else {
664 let value_str = value_field
665 .as_any()
666 .downcast_ref::<datatypes::arrow::array::StringArray>()
667 .ok_or_else(|| {
668 ArrowError::CastError(
669 "Failed to downcast value to StringArray".to_string(),
670 )
671 })?
672 .value(j);
673 Value::String(value_str.to_string())
674 };
675
676 json_obj.insert(key.to_string(), value);
677 }
678
679 let json_value = Value::Object(json_obj);
680 let json_bytes = match extype {
681 Some(ColumnExtType::Json) => {
682 let json_string = match serde_json::to_string(&json_value) {
683 Ok(s) => s,
684 Err(e) => {
685 return Err(ArrowError::CastError(format!(
686 "Failed to serialize JSON: {}",
687 e
688 )));
689 }
690 };
691 match jsonb::parse_value(json_string.as_bytes()) {
692 Ok(jsonb_value) => jsonb_value.to_vec(),
693 Err(e) => {
694 return Err(ArrowError::CastError(format!(
695 "Failed to serialize JSONB: {}",
696 e
697 )));
698 }
699 }
700 }
701 _ => match serde_json::to_vec(&json_value) {
702 Ok(b) => b,
703 Err(e) => {
704 return Err(ArrowError::CastError(format!(
705 "Failed to serialize JSON: {}",
706 e
707 )));
708 }
709 },
710 };
711 json_values.push(Some(json_bytes));
712 }
713 }
714
715 let binary_array = BinaryArray::from_iter(json_values);
716 Ok(Arc::new(binary_array))
717}
718
719#[cfg(test)]
720mod test {
721 use common_error::ext::BoxedError;
722 use common_error::mock::MockError;
723 use common_error::status_code::StatusCode;
724 use datatypes::arrow::array::{ArrayRef, MapArray, StringArray, StructArray};
725 use datatypes::arrow::buffer::OffsetBuffer;
726 use datatypes::arrow::datatypes::Field;
727 use datatypes::prelude::ConcreteDataType;
728 use datatypes::schema::ColumnSchema;
729 use datatypes::vectors::Int32Vector;
730 use serde_json::json;
731 use snafu::IntoError;
732
733 use super::*;
734 use crate::RecordBatches;
735 use crate::error::Error;
736
737 #[tokio::test]
738 async fn test_async_recordbatch_stream_adaptor() {
739 struct MaybeErrorRecordBatchStream {
740 items: Vec<Result<RecordBatch>>,
741 }
742
743 impl RecordBatchStream for MaybeErrorRecordBatchStream {
744 fn schema(&self) -> SchemaRef {
745 unimplemented!()
746 }
747
748 fn output_ordering(&self) -> Option<&[OrderOption]> {
749 None
750 }
751
752 fn metrics(&self) -> Option<RecordBatchMetrics> {
753 None
754 }
755 }
756
757 impl Stream for MaybeErrorRecordBatchStream {
758 type Item = Result<RecordBatch>;
759
760 fn poll_next(
761 mut self: Pin<&mut Self>,
762 _: &mut Context<'_>,
763 ) -> Poll<Option<Self::Item>> {
764 if let Some(batch) = self.items.pop() {
765 Poll::Ready(Some(Ok(batch?)))
766 } else {
767 Poll::Ready(None)
768 }
769 }
770 }
771
772 fn new_future_stream(
773 maybe_recordbatches: Result<Vec<Result<RecordBatch>>>,
774 ) -> FutureStream {
775 Box::pin(async move {
776 maybe_recordbatches
777 .map(|items| Box::pin(MaybeErrorRecordBatchStream { items }) as _)
778 })
779 }
780
781 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
782 "a",
783 ConcreteDataType::int32_datatype(),
784 false,
785 )]));
786 let batch1 = RecordBatch::new(
787 schema.clone(),
788 vec![Arc::new(Int32Vector::from_slice([1])) as _],
789 )
790 .unwrap();
791 let batch2 = RecordBatch::new(
792 schema.clone(),
793 vec![Arc::new(Int32Vector::from_slice([2])) as _],
794 )
795 .unwrap();
796
797 let success_stream = new_future_stream(Ok(vec![Ok(batch1.clone()), Ok(batch2.clone())]));
798 let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), success_stream);
799 let collected = RecordBatches::try_collect(Box::pin(adapter)).await.unwrap();
800 assert_eq!(
801 collected,
802 RecordBatches::try_new(schema.clone(), vec![batch2.clone(), batch1.clone()]).unwrap()
803 );
804
805 let poll_err_stream = new_future_stream(Ok(vec![
806 Ok(batch1.clone()),
807 Err(error::ExternalSnafu
808 .into_error(BoxedError::new(MockError::new(StatusCode::Unknown)))),
809 ]));
810 let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), poll_err_stream);
811 let err = RecordBatches::try_collect(Box::pin(adapter))
812 .await
813 .unwrap_err();
814 assert!(
815 matches!(err, Error::External { .. }),
816 "unexpected err {err}"
817 );
818
819 let failed_to_init_stream =
820 new_future_stream(Err(error::ExternalSnafu
821 .into_error(BoxedError::new(MockError::new(StatusCode::Internal)))));
822 let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), failed_to_init_stream);
823 let err = RecordBatches::try_collect(Box::pin(adapter))
824 .await
825 .unwrap_err();
826 assert!(
827 matches!(err, Error::External { .. }),
828 "unexpected err {err}"
829 );
830 }
831
832 #[test]
833 fn test_convert_map_to_json_binary() {
834 let keys = StringArray::from(vec![Some("a"), Some("b"), Some("c"), Some("x")]);
835 let values = StringArray::from(vec![Some("1"), None, Some("3"), Some("42")]);
836 let key_field = Arc::new(Field::new("key", ArrowDataType::Utf8, false));
837 let value_field = Arc::new(Field::new("value", ArrowDataType::Utf8, true));
838 let struct_type = ArrowDataType::Struct(vec![key_field, value_field].into());
839
840 let entries_field = Arc::new(Field::new("entries", struct_type, false));
841
842 let struct_array = StructArray::from(vec![
843 (
844 Arc::new(Field::new("key", ArrowDataType::Utf8, false)),
845 Arc::new(keys) as ArrayRef,
846 ),
847 (
848 Arc::new(Field::new("value", ArrowDataType::Utf8, true)),
849 Arc::new(values) as ArrayRef,
850 ),
851 ]);
852
853 let offsets = OffsetBuffer::from_lengths([3, 0, 1]);
854 let nulls = datatypes::arrow::buffer::NullBuffer::from(vec![true, false, true]);
855
856 let map_array = MapArray::new(
857 entries_field,
858 offsets,
859 struct_array,
860 Some(nulls), false,
862 );
863
864 let result = convert_map_to_json_binary(&map_array, None).unwrap();
865 let binary_array = result
866 .as_any()
867 .downcast_ref::<datatypes::arrow::array::BinaryArray>()
868 .unwrap();
869
870 let expected_jsons = [
871 Some(r#"{"a":"1","b":null,"c":"3"}"#),
872 None,
873 Some(r#"{"x":"42"}"#),
874 ];
875
876 for (i, _) in expected_jsons.iter().enumerate() {
877 if let Some(expected) = &expected_jsons[i] {
878 assert!(!binary_array.is_null(i));
879 let actual_bytes = binary_array.value(i);
880 let actual_str = std::str::from_utf8(actual_bytes).unwrap();
881 assert_eq!(actual_str, *expected);
882 } else {
883 assert!(binary_array.is_null(i));
884 }
885 }
886
887 let result_json =
888 convert_map_to_json_binary(&map_array, Some(ColumnExtType::Json)).unwrap();
889 let binary_array_json = result_json
890 .as_any()
891 .downcast_ref::<datatypes::arrow::array::BinaryArray>()
892 .unwrap();
893
894 for (i, _) in expected_jsons.iter().enumerate() {
895 if expected_jsons[i].is_some() {
896 assert!(!binary_array_json.is_null(i));
897 let actual_bytes = binary_array_json.value(i);
898 assert_ne!(actual_bytes, expected_jsons[i].unwrap().as_bytes());
899 } else {
900 assert!(binary_array_json.is_null(i));
901 }
902 }
903 }
904
905 #[test]
906 fn test_recordbatch_metrics_deserializes_without_region_watermarks() {
907 let metrics: RecordBatchMetrics = serde_json::from_value(json!({
908 "elapsed_compute": 12,
909 "memory_usage": 34,
910 "plan_metrics": []
911 }))
912 .unwrap();
913
914 assert!(metrics.region_watermarks.is_empty());
915 assert_eq!(metrics.elapsed_compute, 12);
916 assert_eq!(metrics.memory_usage, 34);
917 }
918
919 #[test]
920 fn test_recordbatch_metrics_region_watermarks_serde_roundtrip() {
921 let metrics = RecordBatchMetrics {
922 region_watermarks: vec![
923 RegionWatermarkEntry {
924 region_id: 1,
925 watermark: Some(100),
926 },
927 RegionWatermarkEntry {
928 region_id: 2,
929 watermark: None,
930 },
931 ],
932 ..Default::default()
933 };
934
935 let value = serde_json::to_value(&metrics).unwrap();
936 assert_eq!(
937 value.get("region_watermarks").unwrap(),
938 &json!([
939 { "region_id": 1, "watermark": 100 },
940 { "region_id": 2 }
941 ])
942 );
943
944 let decoded: RecordBatchMetrics = serde_json::from_value(value).unwrap();
945 assert_eq!(decoded.region_watermarks, metrics.region_watermarks);
946 }
947
948 #[test]
949 fn test_recordbatch_metrics_skips_empty_region_watermarks_on_serialize() {
950 let value = serde_json::to_value(RecordBatchMetrics::default()).unwrap();
951 assert!(value.get("region_watermarks").is_none());
952 }
953}