1use std::fmt::{self, Display};
16use std::future::Future;
17use std::marker::PhantomData;
18use std::pin::Pin;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::task::{Context, Poll};
23
24use common_base::readable_size::ReadableSize;
25use common_telemetry::tracing::{Span, info_span};
26use common_time::util::format_nanoseconds_human_readable;
27use datafusion::arrow::compute::cast;
28use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
29use datafusion::error::Result as DfResult;
30use datafusion::execution::context::ExecutionProps;
31use datafusion::logical_expr::Expr;
32use datafusion::logical_expr::utils::conjunction;
33use datafusion::physical_expr::create_physical_expr;
34use datafusion::physical_plan::metrics::{BaselineMetrics, MetricValue};
35use datafusion::physical_plan::{
36 DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor, PhysicalExpr,
37 RecordBatchStream as DfRecordBatchStream, accept,
38};
39use datafusion_common::arrow::error::ArrowError;
40use datafusion_common::{DataFusionError, ToDFSchema};
41use datatypes::arrow::array::Array;
42use datatypes::arrow::datatypes::DataType as ArrowDataType;
43use datatypes::schema::{ColumnExtType, Schema, SchemaRef};
44use futures::ready;
45use jsonb;
46use pin_project::pin_project;
47use snafu::ResultExt;
48
49use crate::error::{self, Result};
50use crate::filter::batch_filter;
51use crate::{
52 DfRecordBatch, DfSendableRecordBatchStream, OrderOption, RecordBatch, RecordBatchStream,
53 SendableRecordBatchStream, Stream,
54};
55
56const REGION_SCAN_EXEC_NAME: &str = "RegionScanExec";
57
58type FutureStream =
59 Pin<Box<dyn std::future::Future<Output = Result<SendableRecordBatchStream>> + Send>>;
60
61#[pin_project]
63pub struct RecordBatchStreamTypeAdapter<T, E> {
64 #[pin]
65 stream: T,
66 projected_schema: DfSchemaRef,
67 projection: Vec<usize>,
68 predicate: Option<Arc<dyn PhysicalExpr>>,
69 phantom: PhantomData<E>,
70}
71
72impl<T, E> RecordBatchStreamTypeAdapter<T, E>
73where
74 T: Stream<Item = std::result::Result<DfRecordBatch, E>>,
75 E: std::error::Error + Send + Sync + 'static,
76{
77 pub fn new(projected_schema: DfSchemaRef, stream: T, projection: Option<Vec<usize>>) -> Self {
78 let projection = if let Some(projection) = projection {
79 projection
80 } else {
81 (0..projected_schema.fields().len()).collect()
82 };
83
84 Self {
85 stream,
86 projected_schema,
87 projection,
88 predicate: None,
89 phantom: Default::default(),
90 }
91 }
92
93 pub fn with_filter(mut self, filters: Vec<Expr>) -> Result<Self> {
94 let filters = if let Some(expr) = conjunction(filters) {
95 let df_schema = self
96 .projected_schema
97 .clone()
98 .to_dfschema_ref()
99 .context(error::PhysicalExprSnafu)?;
100
101 let filters = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())
102 .context(error::PhysicalExprSnafu)?;
103 Some(filters)
104 } else {
105 None
106 };
107 self.predicate = filters;
108 Ok(self)
109 }
110}
111
112impl<T, E> DfRecordBatchStream for RecordBatchStreamTypeAdapter<T, E>
113where
114 T: Stream<Item = std::result::Result<DfRecordBatch, E>>,
115 E: std::error::Error + Send + Sync + 'static,
116{
117 fn schema(&self) -> DfSchemaRef {
118 self.projected_schema.clone()
119 }
120}
121
122impl<T, E> Stream for RecordBatchStreamTypeAdapter<T, E>
123where
124 T: Stream<Item = std::result::Result<DfRecordBatch, E>>,
125 E: std::error::Error + Send + Sync + 'static,
126{
127 type Item = DfResult<DfRecordBatch>;
128
129 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
130 let this = self.project();
131
132 let batch = futures::ready!(this.stream.poll_next(cx))
133 .map(|r| r.map_err(|e| DataFusionError::External(Box::new(e))));
134
135 let projected_schema = this.projected_schema.clone();
136 let projection = this.projection.clone();
137 let predicate = this.predicate.clone();
138
139 let batch = batch.map(|b| {
140 b.and_then(|b| {
141 let projected_column = b.project(&projection)?;
142 if projected_column.schema().fields.len() != projected_schema.fields.len() {
143 return Err(DataFusionError::ArrowError(Box::new(ArrowError::SchemaError(format!(
144 "Trying to cast a RecordBatch into an incompatible schema. RecordBatch: {}, Target: {}",
145 projected_column.schema(),
146 projected_schema,
147 ))), None));
148 }
149
150 let mut columns = Vec::with_capacity(projected_schema.fields.len());
151 for (idx,field) in projected_schema.fields.iter().enumerate() {
152 let column = projected_column.column(idx);
153 let extype = field.metadata().get("greptime:type").and_then(|s| ColumnExtType::from_str(s).ok());
154 let output = custom_cast(&column, field.data_type(), extype)?;
155 columns.push(output)
156 }
157 let record_batch = DfRecordBatch::try_new(projected_schema, columns)?;
158 let record_batch = if let Some(predicate) = predicate {
159 batch_filter(&record_batch, &predicate)?
160 } else {
161 record_batch
162 };
163 Ok(record_batch)
164 })
165 });
166
167 Poll::Ready(batch)
168 }
169
170 #[inline]
171 fn size_hint(&self) -> (usize, Option<usize>) {
172 self.stream.size_hint()
173 }
174}
175
176pub struct DfRecordBatchStreamAdapter {
179 stream: SendableRecordBatchStream,
180}
181
182impl DfRecordBatchStreamAdapter {
183 pub fn new(stream: SendableRecordBatchStream) -> Self {
184 Self { stream }
185 }
186}
187
188impl DfRecordBatchStream for DfRecordBatchStreamAdapter {
189 fn schema(&self) -> DfSchemaRef {
190 self.stream.schema().arrow_schema().clone()
191 }
192}
193
194impl Stream for DfRecordBatchStreamAdapter {
195 type Item = DfResult<DfRecordBatch>;
196
197 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
198 match Pin::new(&mut self.stream).poll_next(cx) {
199 Poll::Pending => Poll::Pending,
200 Poll::Ready(Some(recordbatch)) => match recordbatch {
201 Ok(recordbatch) => Poll::Ready(Some(Ok(recordbatch.into_df_record_batch()))),
202 Err(e) => Poll::Ready(Some(Err(DataFusionError::External(Box::new(e))))),
203 },
204 Poll::Ready(None) => Poll::Ready(None),
205 }
206 }
207
208 #[inline]
209 fn size_hint(&self) -> (usize, Option<usize>) {
210 self.stream.size_hint()
211 }
212}
213
214pub struct RecordBatchStreamAdapter {
218 schema: SchemaRef,
219 stream: DfSendableRecordBatchStream,
220 metrics: Option<BaselineMetrics>,
221 metrics_2: Metrics,
223 query_load_region_id: Option<u64>,
224 query_stat_counters: Option<RegionQueryStatCounters>,
225 explain_verbose: bool,
227 span: Span,
228}
229
230#[derive(Debug, Clone)]
232pub struct RegionQueryStatCounters {
233 pub query_cpu_time: Arc<AtomicU64>,
235 pub query_scanned_bytes: Arc<AtomicU64>,
237}
238
239enum Metrics {
241 Unavailable,
242 Unresolved(Arc<dyn ExecutionPlan>),
243 PartialResolved(Arc<dyn ExecutionPlan>, RecordBatchMetrics),
244 Resolved(RecordBatchMetrics),
245}
246
247impl RecordBatchStreamAdapter {
248 pub fn try_new(stream: DfSendableRecordBatchStream) -> Result<Self> {
249 let schema =
250 Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
251 Ok(Self {
252 schema,
253 stream,
254 metrics: None,
255 metrics_2: Metrics::Unavailable,
256 query_load_region_id: None,
257 query_stat_counters: None,
258 explain_verbose: false,
259 span: Span::current(),
260 })
261 }
262
263 pub fn try_new_with_span(stream: DfSendableRecordBatchStream, span: Span) -> Result<Self> {
264 let schema =
265 Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
266 let subspan = info_span!(parent: &span, "RecordBatchStreamAdapter");
267 Ok(Self {
268 schema,
269 stream,
270 metrics: None,
271 metrics_2: Metrics::Unavailable,
272 query_load_region_id: None,
273 query_stat_counters: None,
274 explain_verbose: false,
275 span: subspan,
276 })
277 }
278
279 pub fn set_metrics2(&mut self, plan: Arc<dyn ExecutionPlan>) {
280 self.metrics_2 = Metrics::Unresolved(plan)
281 }
282
283 fn record_query_stats_on_drop(&self) {
284 let Some(counters) = &self.query_stat_counters else {
285 return;
286 };
287
288 match &self.metrics_2 {
289 Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) => {
290 let mut metric_collector = MetricCollector::new(self.explain_verbose);
291 accept(df_plan.as_ref(), &mut metric_collector).unwrap();
292 metric_collector.record_batch_metrics.query_load_region_id =
293 self.query_load_region_id;
294 record_query_stats(counters, &metric_collector.record_batch_metrics);
295 }
296 Metrics::Resolved(metrics) => record_query_stats(counters, metrics),
297 Metrics::Unavailable => {}
298 }
299 }
300
301 pub fn set_query_load_region_id(&mut self, region_id: Option<u64>) {
302 self.query_load_region_id = region_id;
303 }
304
305 pub fn set_query_stat_counters(&mut self, counters: Option<RegionQueryStatCounters>) {
306 self.query_stat_counters = counters;
307 }
308
309 pub fn set_explain_verbose(&mut self, verbose: bool) {
311 self.explain_verbose = verbose;
312 }
313}
314
315pub fn region_scan_output_bytes(metrics: &RecordBatchMetrics) -> usize {
317 metrics
318 .plan_metrics
319 .iter()
320 .filter(|pm| pm.plan_name == REGION_SCAN_EXEC_NAME)
321 .flat_map(|pm| &pm.metrics)
322 .filter_map(|(name, value)| (name == "output_bytes").then_some(*value))
323 .sum()
324}
325
326fn record_query_stats(counters: &RegionQueryStatCounters, metrics: &RecordBatchMetrics) {
327 counters
328 .query_cpu_time
329 .fetch_add(metrics.elapsed_compute as u64, Ordering::Relaxed);
330 counters
331 .query_scanned_bytes
332 .fetch_add(region_scan_output_bytes(metrics) as u64, Ordering::Relaxed);
333}
334
335impl RecordBatchStream for RecordBatchStreamAdapter {
336 fn name(&self) -> &str {
337 "RecordBatchStreamAdapter"
338 }
339
340 fn schema(&self) -> SchemaRef {
341 self.schema.clone()
342 }
343
344 fn metrics(&self) -> Option<RecordBatchMetrics> {
345 match &self.metrics_2 {
346 Metrics::Resolved(metrics) | Metrics::PartialResolved(_, metrics) => {
347 Some(metrics.clone())
348 }
349 Metrics::Unavailable | Metrics::Unresolved(_) => None,
350 }
351 }
352
353 fn output_ordering(&self) -> Option<&[OrderOption]> {
354 None
355 }
356}
357
358impl Stream for RecordBatchStreamAdapter {
359 type Item = Result<RecordBatch>;
360
361 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
362 let timer = self
363 .metrics
364 .as_ref()
365 .map(|m| m.elapsed_compute().clone())
366 .unwrap_or_default();
367 let _guard = timer.timer();
368 let poll_span = info_span!(parent: &self.span, "poll_next");
369 let _entered = poll_span.enter();
370 match Pin::new(&mut self.stream).poll_next(cx) {
371 Poll::Pending => Poll::Pending,
372 Poll::Ready(Some(df_record_batch)) => {
373 let df_record_batch = df_record_batch?;
374 if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) =
375 &self.metrics_2
376 {
377 let mut metric_collector = MetricCollector::new(self.explain_verbose);
378 accept(df_plan.as_ref(), &mut metric_collector).unwrap();
379 metric_collector.record_batch_metrics.query_load_region_id =
380 self.query_load_region_id;
381 self.metrics_2 = Metrics::PartialResolved(
382 df_plan.clone(),
383 metric_collector.record_batch_metrics,
384 );
385 }
386 Poll::Ready(Some(Ok(RecordBatch::from_df_record_batch(
387 self.schema(),
388 df_record_batch,
389 ))))
390 }
391 Poll::Ready(None) => {
392 if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) =
393 &self.metrics_2
394 {
395 let mut metric_collector = MetricCollector::new(self.explain_verbose);
396 accept(df_plan.as_ref(), &mut metric_collector).unwrap();
397 metric_collector.record_batch_metrics.query_load_region_id =
398 self.query_load_region_id;
399 self.metrics_2 = Metrics::Resolved(metric_collector.record_batch_metrics);
400 }
401 Poll::Ready(None)
402 }
403 }
404 }
405
406 #[inline]
407 fn size_hint(&self) -> (usize, Option<usize>) {
408 self.stream.size_hint()
409 }
410}
411
412impl Drop for RecordBatchStreamAdapter {
413 fn drop(&mut self) {
414 self.record_query_stats_on_drop();
415 }
416}
417
418pub struct MetricCollector {
420 current_level: usize,
421 pub record_batch_metrics: RecordBatchMetrics,
422 verbose: bool,
423}
424
425impl MetricCollector {
426 pub fn new(verbose: bool) -> Self {
427 Self {
428 current_level: 0,
429 record_batch_metrics: RecordBatchMetrics::default(),
430 verbose,
431 }
432 }
433}
434
435impl ExecutionPlanVisitor for MetricCollector {
436 type Error = !;
437
438 fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> std::result::Result<bool, Self::Error> {
439 let Some(metric) = plan.metrics() else {
441 self.record_batch_metrics.plan_metrics.push(PlanMetrics {
442 plan: plan.name().to_string(),
443 plan_name: plan.name().to_string(),
444 level: self.current_level,
445 metrics: vec![],
446 });
447 self.current_level += 1;
448 return Ok(true);
449 };
450
451 let metric = metric
453 .aggregate_by_name()
454 .sorted_for_display()
455 .timestamps_removed();
456 let mut plan_metric = PlanMetrics {
457 plan: one_line(plan, self.verbose).to_string(),
458 plan_name: plan.name().to_string(),
459 level: self.current_level,
460 metrics: Vec::with_capacity(metric.iter().size_hint().0),
461 };
462 for m in metric.iter() {
463 plan_metric
464 .metrics
465 .push((m.value().name().to_string(), m.value().as_usize()));
466
467 match m.value() {
469 MetricValue::ElapsedCompute(ec) => {
470 self.record_batch_metrics.elapsed_compute += ec.value()
471 }
472 MetricValue::CurrentMemoryUsage(m) => {
473 self.record_batch_metrics.memory_usage += m.value()
474 }
475 _ => {}
476 }
477 }
478 self.record_batch_metrics.plan_metrics.push(plan_metric);
479
480 self.current_level += 1;
481 Ok(true)
482 }
483
484 fn post_visit(&mut self, _plan: &dyn ExecutionPlan) -> std::result::Result<bool, Self::Error> {
485 self.current_level -= 1;
486 Ok(true)
487 }
488}
489
490fn one_line(plan: &dyn ExecutionPlan, verbose: bool) -> impl fmt::Display + '_ {
493 struct Wrapper<'a> {
494 plan: &'a dyn ExecutionPlan,
495 format_type: DisplayFormatType,
496 }
497
498 impl fmt::Display for Wrapper<'_> {
499 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
500 self.plan.fmt_as(self.format_type, f)?;
501 writeln!(f)
502 }
503 }
504
505 let format_type = if verbose {
506 DisplayFormatType::Verbose
507 } else {
508 DisplayFormatType::Default
509 };
510 Wrapper { plan, format_type }
511}
512
513#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone)]
516pub struct RecordBatchMetrics {
517 pub elapsed_compute: usize,
520 pub memory_usage: usize,
522 pub plan_metrics: Vec<PlanMetrics>,
525 #[serde(default, skip_serializing_if = "Option::is_none")]
527 pub query_load_region_id: Option<u64>,
528 #[serde(default, skip_serializing_if = "Vec::is_empty")]
541 pub region_watermarks: Vec<RegionWatermarkEntry>,
542}
543
544#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
545pub struct RegionWatermarkEntry {
546 pub region_id: u64,
547 #[serde(default, skip_serializing_if = "Option::is_none")]
548 pub watermark: Option<u64>,
549}
550
551fn is_time_metric(metric_name: &str) -> bool {
553 metric_name.contains("elapsed") || metric_name.contains("time") || metric_name.contains("cost")
554}
555
556fn is_bytes_metric(metric_name: &str) -> bool {
558 metric_name.contains("bytes") || metric_name.contains("mem")
559}
560
561fn format_bytes_human_readable(bytes: usize) -> String {
562 format!("{}", ReadableSize(bytes as u64))
563}
564
565impl Display for RecordBatchMetrics {
567 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
568 for metric in &self.plan_metrics {
569 write!(
570 f,
571 "{:indent$}{} metrics=[",
572 " ",
573 metric.plan.trim_end(),
574 indent = metric.level * 2,
575 )?;
576 for (label, value) in &metric.metrics {
577 if is_time_metric(label) {
578 write!(
579 f,
580 "{}: {}, ",
581 label,
582 format_nanoseconds_human_readable(*value),
583 )?;
584 } else if is_bytes_metric(label) {
585 write!(f, "{}: {}, ", label, format_bytes_human_readable(*value),)?;
586 } else {
587 write!(f, "{}: {}, ", label, value)?;
588 }
589 }
590 writeln!(f, "]")?;
591 }
592
593 Ok(())
594 }
595}
596
597#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone)]
598pub struct PlanMetrics {
599 pub plan: String,
601 #[serde(default)]
603 pub plan_name: String,
604 pub level: usize,
606 pub metrics: Vec<(String, usize)>,
609}
610
611enum AsyncRecordBatchStreamAdapterState {
612 Uninit(FutureStream),
613 Ready(SendableRecordBatchStream),
614 Failed,
615}
616
617pub struct AsyncRecordBatchStreamAdapter {
618 schema: SchemaRef,
619 state: AsyncRecordBatchStreamAdapterState,
620}
621
622impl AsyncRecordBatchStreamAdapter {
623 pub fn new(schema: SchemaRef, stream: FutureStream) -> Self {
624 Self {
625 schema,
626 state: AsyncRecordBatchStreamAdapterState::Uninit(stream),
627 }
628 }
629}
630
631impl RecordBatchStream for AsyncRecordBatchStreamAdapter {
632 fn schema(&self) -> SchemaRef {
633 self.schema.clone()
634 }
635
636 fn output_ordering(&self) -> Option<&[OrderOption]> {
637 None
638 }
639
640 fn metrics(&self) -> Option<RecordBatchMetrics> {
641 None
642 }
643}
644
645impl Stream for AsyncRecordBatchStreamAdapter {
646 type Item = Result<RecordBatch>;
647
648 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
649 loop {
650 match &mut self.state {
651 AsyncRecordBatchStreamAdapterState::Uninit(stream_future) => {
652 match ready!(Pin::new(stream_future).poll(cx)) {
653 Ok(stream) => {
654 self.state = AsyncRecordBatchStreamAdapterState::Ready(stream);
655 continue;
656 }
657 Err(e) => {
658 self.state = AsyncRecordBatchStreamAdapterState::Failed;
659 return Poll::Ready(Some(Err(e)));
660 }
661 };
662 }
663 AsyncRecordBatchStreamAdapterState::Ready(stream) => {
664 return Poll::Ready(ready!(Pin::new(stream).poll_next(cx)));
665 }
666 AsyncRecordBatchStreamAdapterState::Failed => return Poll::Ready(None),
667 }
668 }
669 }
670
671 #[inline]
673 fn size_hint(&self) -> (usize, Option<usize>) {
674 (0, None)
675 }
676}
677
678fn custom_cast(
680 array: &dyn Array,
681 target_type: &ArrowDataType,
682 extype: Option<ColumnExtType>,
683) -> std::result::Result<Arc<dyn Array>, ArrowError> {
684 if let ArrowDataType::Map(_, _) = array.data_type()
685 && let ArrowDataType::Binary = target_type
686 {
687 return convert_map_to_json_binary(array, extype);
688 }
689
690 cast(array, target_type)
691}
692
693fn convert_map_to_json_binary(
695 array: &dyn Array,
696 extype: Option<ColumnExtType>,
697) -> std::result::Result<Arc<dyn Array>, ArrowError> {
698 use datatypes::arrow::array::{BinaryArray, MapArray};
699 use serde_json::Value;
700
701 let map_array = array
702 .as_any()
703 .downcast_ref::<MapArray>()
704 .ok_or_else(|| ArrowError::CastError("Failed to downcast to MapArray".to_string()))?;
705
706 let mut json_values = Vec::with_capacity(map_array.len());
707
708 for i in 0..map_array.len() {
709 if map_array.is_null(i) {
710 json_values.push(None);
711 } else {
712 let map_entry = map_array.value(i);
714 let key_value_array = map_entry
715 .as_any()
716 .downcast_ref::<datatypes::arrow::array::StructArray>()
717 .ok_or_else(|| {
718 ArrowError::CastError("Failed to downcast to StructArray".to_string())
719 })?;
720
721 let mut json_obj = serde_json::Map::with_capacity(key_value_array.len());
723
724 for j in 0..key_value_array.len() {
725 if key_value_array.is_null(j) {
726 continue;
727 }
728 let key_field = key_value_array.column(0);
729 let value_field = key_value_array.column(1);
730
731 if key_field.is_null(j) {
732 continue;
733 }
734
735 let key = key_field
736 .as_any()
737 .downcast_ref::<datatypes::arrow::array::StringArray>()
738 .ok_or_else(|| {
739 ArrowError::CastError("Failed to downcast key to StringArray".to_string())
740 })?
741 .value(j);
742
743 let value = if value_field.is_null(j) {
744 Value::Null
745 } else {
746 let value_str = value_field
747 .as_any()
748 .downcast_ref::<datatypes::arrow::array::StringArray>()
749 .ok_or_else(|| {
750 ArrowError::CastError(
751 "Failed to downcast value to StringArray".to_string(),
752 )
753 })?
754 .value(j);
755 Value::String(value_str.to_string())
756 };
757
758 json_obj.insert(key.to_string(), value);
759 }
760
761 let json_value = Value::Object(json_obj);
762 let json_bytes = match extype {
763 Some(ColumnExtType::Json) => {
764 let json_string = match serde_json::to_string(&json_value) {
765 Ok(s) => s,
766 Err(e) => {
767 return Err(ArrowError::CastError(format!(
768 "Failed to serialize JSON: {}",
769 e
770 )));
771 }
772 };
773 match jsonb::parse_value(json_string.as_bytes()) {
774 Ok(jsonb_value) => jsonb_value.to_vec(),
775 Err(e) => {
776 return Err(ArrowError::CastError(format!(
777 "Failed to serialize JSONB: {}",
778 e
779 )));
780 }
781 }
782 }
783 _ => match serde_json::to_vec(&json_value) {
784 Ok(b) => b,
785 Err(e) => {
786 return Err(ArrowError::CastError(format!(
787 "Failed to serialize JSON: {}",
788 e
789 )));
790 }
791 },
792 };
793 json_values.push(Some(json_bytes));
794 }
795 }
796
797 let binary_array = BinaryArray::from_iter(json_values);
798 Ok(Arc::new(binary_array))
799}
800
801#[cfg(test)]
802mod test {
803 use common_error::ext::BoxedError;
804 use common_error::mock::MockError;
805 use common_error::status_code::StatusCode;
806 use datatypes::arrow::array::{ArrayRef, MapArray, StringArray, StructArray};
807 use datatypes::arrow::buffer::OffsetBuffer;
808 use datatypes::arrow::datatypes::Field;
809 use datatypes::prelude::ConcreteDataType;
810 use datatypes::schema::ColumnSchema;
811 use datatypes::vectors::Int32Vector;
812 use serde_json::json;
813 use snafu::IntoError;
814
815 use super::*;
816 use crate::RecordBatches;
817 use crate::error::Error;
818
819 #[tokio::test]
820 async fn test_async_recordbatch_stream_adaptor() {
821 struct MaybeErrorRecordBatchStream {
822 items: Vec<Result<RecordBatch>>,
823 }
824
825 impl RecordBatchStream for MaybeErrorRecordBatchStream {
826 fn schema(&self) -> SchemaRef {
827 unimplemented!()
828 }
829
830 fn output_ordering(&self) -> Option<&[OrderOption]> {
831 None
832 }
833
834 fn metrics(&self) -> Option<RecordBatchMetrics> {
835 None
836 }
837 }
838
839 impl Stream for MaybeErrorRecordBatchStream {
840 type Item = Result<RecordBatch>;
841
842 fn poll_next(
843 mut self: Pin<&mut Self>,
844 _: &mut Context<'_>,
845 ) -> Poll<Option<Self::Item>> {
846 if let Some(batch) = self.items.pop() {
847 Poll::Ready(Some(Ok(batch?)))
848 } else {
849 Poll::Ready(None)
850 }
851 }
852 }
853
854 fn new_future_stream(
855 maybe_recordbatches: Result<Vec<Result<RecordBatch>>>,
856 ) -> FutureStream {
857 Box::pin(async move {
858 maybe_recordbatches
859 .map(|items| Box::pin(MaybeErrorRecordBatchStream { items }) as _)
860 })
861 }
862
863 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
864 "a",
865 ConcreteDataType::int32_datatype(),
866 false,
867 )]));
868 let batch1 = RecordBatch::new(
869 schema.clone(),
870 vec![Arc::new(Int32Vector::from_slice([1])) as _],
871 )
872 .unwrap();
873 let batch2 = RecordBatch::new(
874 schema.clone(),
875 vec![Arc::new(Int32Vector::from_slice([2])) as _],
876 )
877 .unwrap();
878
879 let success_stream = new_future_stream(Ok(vec![Ok(batch1.clone()), Ok(batch2.clone())]));
880 let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), success_stream);
881 let collected = RecordBatches::try_collect(Box::pin(adapter)).await.unwrap();
882 assert_eq!(
883 collected,
884 RecordBatches::try_new(schema.clone(), vec![batch2.clone(), batch1.clone()]).unwrap()
885 );
886
887 let poll_err_stream = new_future_stream(Ok(vec![
888 Ok(batch1.clone()),
889 Err(error::ExternalSnafu
890 .into_error(BoxedError::new(MockError::new(StatusCode::Unknown)))),
891 ]));
892 let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), poll_err_stream);
893 let err = RecordBatches::try_collect(Box::pin(adapter))
894 .await
895 .unwrap_err();
896 assert!(
897 matches!(err, Error::External { .. }),
898 "unexpected err {err}"
899 );
900
901 let failed_to_init_stream =
902 new_future_stream(Err(error::ExternalSnafu
903 .into_error(BoxedError::new(MockError::new(StatusCode::Internal)))));
904 let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), failed_to_init_stream);
905 let err = RecordBatches::try_collect(Box::pin(adapter))
906 .await
907 .unwrap_err();
908 assert!(
909 matches!(err, Error::External { .. }),
910 "unexpected err {err}"
911 );
912 }
913
914 #[test]
915 fn test_convert_map_to_json_binary() {
916 let keys = StringArray::from(vec![Some("a"), Some("b"), Some("c"), Some("x")]);
917 let values = StringArray::from(vec![Some("1"), None, Some("3"), Some("42")]);
918 let key_field = Arc::new(Field::new("key", ArrowDataType::Utf8, false));
919 let value_field = Arc::new(Field::new("value", ArrowDataType::Utf8, true));
920 let struct_type = ArrowDataType::Struct(vec![key_field, value_field].into());
921
922 let entries_field = Arc::new(Field::new("entries", struct_type, false));
923
924 let struct_array = StructArray::from(vec![
925 (
926 Arc::new(Field::new("key", ArrowDataType::Utf8, false)),
927 Arc::new(keys) as ArrayRef,
928 ),
929 (
930 Arc::new(Field::new("value", ArrowDataType::Utf8, true)),
931 Arc::new(values) as ArrayRef,
932 ),
933 ]);
934
935 let offsets = OffsetBuffer::from_lengths([3, 0, 1]);
936 let nulls = datatypes::arrow::buffer::NullBuffer::from(vec![true, false, true]);
937
938 let map_array = MapArray::new(
939 entries_field,
940 offsets,
941 struct_array,
942 Some(nulls), false,
944 );
945
946 let result = convert_map_to_json_binary(&map_array, None).unwrap();
947 let binary_array = result
948 .as_any()
949 .downcast_ref::<datatypes::arrow::array::BinaryArray>()
950 .unwrap();
951
952 let expected_jsons = [
953 Some(r#"{"a":"1","b":null,"c":"3"}"#),
954 None,
955 Some(r#"{"x":"42"}"#),
956 ];
957
958 for (i, _) in expected_jsons.iter().enumerate() {
959 if let Some(expected) = &expected_jsons[i] {
960 assert!(!binary_array.is_null(i));
961 let actual_bytes = binary_array.value(i);
962 let actual_str = std::str::from_utf8(actual_bytes).unwrap();
963 assert_eq!(actual_str, *expected);
964 } else {
965 assert!(binary_array.is_null(i));
966 }
967 }
968
969 let result_json =
970 convert_map_to_json_binary(&map_array, Some(ColumnExtType::Json)).unwrap();
971 let binary_array_json = result_json
972 .as_any()
973 .downcast_ref::<datatypes::arrow::array::BinaryArray>()
974 .unwrap();
975
976 for (i, _) in expected_jsons.iter().enumerate() {
977 if expected_jsons[i].is_some() {
978 assert!(!binary_array_json.is_null(i));
979 let actual_bytes = binary_array_json.value(i);
980 assert_ne!(actual_bytes, expected_jsons[i].unwrap().as_bytes());
981 } else {
982 assert!(binary_array_json.is_null(i));
983 }
984 }
985 }
986
987 #[test]
988 fn test_record_query_stats_updates_region_counters() {
989 let counters = RegionQueryStatCounters {
990 query_cpu_time: Arc::new(AtomicU64::new(10)),
991 query_scanned_bytes: Arc::new(AtomicU64::new(20)),
992 };
993 let metrics = RecordBatchMetrics {
994 elapsed_compute: 2_000_000,
995 plan_metrics: vec![PlanMetrics {
996 plan: "RegionScanExec: region=1".to_string(),
997 plan_name: REGION_SCAN_EXEC_NAME.to_string(),
998 level: 0,
999 metrics: vec![("output_bytes".to_string(), 42)],
1000 }],
1001 ..Default::default()
1002 };
1003
1004 record_query_stats(&counters, &metrics);
1005
1006 assert_eq!(counters.query_cpu_time.load(Ordering::Relaxed), 2_000_010);
1007 assert_eq!(counters.query_scanned_bytes.load(Ordering::Relaxed), 62);
1008 }
1009
1010 #[test]
1011 fn test_record_batch_stream_adapter_records_query_stats_on_drop() {
1012 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
1013 "a",
1014 ConcreteDataType::int32_datatype(),
1015 false,
1016 )]));
1017 let df_stream = Box::pin(
1018 datafusion::physical_plan::stream::RecordBatchStreamAdapter::new(
1019 schema.arrow_schema().clone(),
1020 futures::stream::empty::<datafusion::error::Result<DfRecordBatch>>(),
1021 ),
1022 );
1023 let counters = RegionQueryStatCounters {
1024 query_cpu_time: Arc::new(AtomicU64::new(10)),
1025 query_scanned_bytes: Arc::new(AtomicU64::new(20)),
1026 };
1027 let metrics = RecordBatchMetrics {
1028 elapsed_compute: 2_000_000,
1029 plan_metrics: vec![PlanMetrics {
1030 plan: "RegionScanExec: region=1".to_string(),
1031 plan_name: REGION_SCAN_EXEC_NAME.to_string(),
1032 level: 0,
1033 metrics: vec![("output_bytes".to_string(), 42)],
1034 }],
1035 ..Default::default()
1036 };
1037 let adapter = RecordBatchStreamAdapter {
1038 schema,
1039 stream: df_stream,
1040 metrics: None,
1041 metrics_2: Metrics::Resolved(metrics),
1042 query_load_region_id: None,
1043 query_stat_counters: Some(counters.clone()),
1044 explain_verbose: false,
1045 span: Span::current(),
1046 };
1047
1048 drop(adapter);
1049
1050 assert_eq!(counters.query_cpu_time.load(Ordering::Relaxed), 2_000_010);
1051 assert_eq!(counters.query_scanned_bytes.load(Ordering::Relaxed), 62);
1052 }
1053
1054 #[test]
1055 fn test_recordbatch_metrics_deserializes_without_region_watermarks() {
1056 let metrics: RecordBatchMetrics = serde_json::from_value(json!({
1057 "elapsed_compute": 12,
1058 "memory_usage": 34,
1059 "plan_metrics": []
1060 }))
1061 .unwrap();
1062
1063 assert!(metrics.region_watermarks.is_empty());
1064 assert_eq!(metrics.elapsed_compute, 12);
1065 assert_eq!(metrics.memory_usage, 34);
1066 }
1067
1068 #[test]
1069 fn test_plan_metrics_deserializes_without_plan_name() {
1070 let metrics: RecordBatchMetrics = serde_json::from_value(json!({
1071 "elapsed_compute": 12,
1072 "memory_usage": 34,
1073 "plan_metrics": [{
1074 "plan": "SeqScan: region=1",
1075 "level": 0,
1076 "metrics": []
1077 }]
1078 }))
1079 .unwrap();
1080
1081 assert_eq!(metrics.plan_metrics[0].plan_name, "");
1082 }
1083
1084 #[test]
1085 fn test_recordbatch_metrics_region_watermarks_serde_roundtrip() {
1086 let metrics = RecordBatchMetrics {
1087 region_watermarks: vec![
1088 RegionWatermarkEntry {
1089 region_id: 1,
1090 watermark: Some(100),
1091 },
1092 RegionWatermarkEntry {
1093 region_id: 2,
1094 watermark: None,
1095 },
1096 ],
1097 ..Default::default()
1098 };
1099
1100 let value = serde_json::to_value(&metrics).unwrap();
1101 assert_eq!(
1102 value.get("region_watermarks").unwrap(),
1103 &json!([
1104 { "region_id": 1, "watermark": 100 },
1105 { "region_id": 2 }
1106 ])
1107 );
1108
1109 let decoded: RecordBatchMetrics = serde_json::from_value(value).unwrap();
1110 assert_eq!(decoded.region_watermarks, metrics.region_watermarks);
1111 }
1112
1113 #[test]
1114 fn test_recordbatch_metrics_skips_empty_region_watermarks_on_serialize() {
1115 let value = serde_json::to_value(RecordBatchMetrics::default()).unwrap();
1116 assert!(value.get("region_watermarks").is_none());
1117 }
1118}