Skip to main content

common_recordbatch/
adapter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{self, Display};
16use std::future::Future;
17use std::marker::PhantomData;
18use std::pin::Pin;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::task::{Context, Poll};
23
24use common_base::readable_size::ReadableSize;
25use common_telemetry::tracing::{Span, info_span};
26use common_time::util::format_nanoseconds_human_readable;
27use datafusion::arrow::compute::cast;
28use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
29use datafusion::error::Result as DfResult;
30use datafusion::execution::context::ExecutionProps;
31use datafusion::logical_expr::Expr;
32use datafusion::logical_expr::utils::conjunction;
33use datafusion::physical_expr::create_physical_expr;
34use datafusion::physical_plan::metrics::{BaselineMetrics, MetricValue};
35use datafusion::physical_plan::{
36    DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor, PhysicalExpr,
37    RecordBatchStream as DfRecordBatchStream, accept,
38};
39use datafusion_common::arrow::error::ArrowError;
40use datafusion_common::{DataFusionError, ToDFSchema};
41use datatypes::arrow::array::Array;
42use datatypes::arrow::datatypes::DataType as ArrowDataType;
43use datatypes::schema::{ColumnExtType, Schema, SchemaRef};
44use futures::ready;
45use jsonb;
46use pin_project::pin_project;
47use snafu::ResultExt;
48
49use crate::error::{self, Result};
50use crate::filter::batch_filter;
51use crate::{
52    DfRecordBatch, DfSendableRecordBatchStream, OrderOption, RecordBatch, RecordBatchStream,
53    SendableRecordBatchStream, Stream,
54};
55
56const REGION_SCAN_EXEC_NAME: &str = "RegionScanExec";
57
58type FutureStream =
59    Pin<Box<dyn std::future::Future<Output = Result<SendableRecordBatchStream>> + Send>>;
60
61/// Casts the `RecordBatch`es of `stream` against the `output_schema`.
62#[pin_project]
63pub struct RecordBatchStreamTypeAdapter<T, E> {
64    #[pin]
65    stream: T,
66    projected_schema: DfSchemaRef,
67    projection: Vec<usize>,
68    predicate: Option<Arc<dyn PhysicalExpr>>,
69    phantom: PhantomData<E>,
70}
71
72impl<T, E> RecordBatchStreamTypeAdapter<T, E>
73where
74    T: Stream<Item = std::result::Result<DfRecordBatch, E>>,
75    E: std::error::Error + Send + Sync + 'static,
76{
77    pub fn new(projected_schema: DfSchemaRef, stream: T, projection: Option<Vec<usize>>) -> Self {
78        let projection = if let Some(projection) = projection {
79            projection
80        } else {
81            (0..projected_schema.fields().len()).collect()
82        };
83
84        Self {
85            stream,
86            projected_schema,
87            projection,
88            predicate: None,
89            phantom: Default::default(),
90        }
91    }
92
93    pub fn with_filter(mut self, filters: Vec<Expr>) -> Result<Self> {
94        let filters = if let Some(expr) = conjunction(filters) {
95            let df_schema = self
96                .projected_schema
97                .clone()
98                .to_dfschema_ref()
99                .context(error::PhysicalExprSnafu)?;
100
101            let filters = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())
102                .context(error::PhysicalExprSnafu)?;
103            Some(filters)
104        } else {
105            None
106        };
107        self.predicate = filters;
108        Ok(self)
109    }
110}
111
112impl<T, E> DfRecordBatchStream for RecordBatchStreamTypeAdapter<T, E>
113where
114    T: Stream<Item = std::result::Result<DfRecordBatch, E>>,
115    E: std::error::Error + Send + Sync + 'static,
116{
117    fn schema(&self) -> DfSchemaRef {
118        self.projected_schema.clone()
119    }
120}
121
122impl<T, E> Stream for RecordBatchStreamTypeAdapter<T, E>
123where
124    T: Stream<Item = std::result::Result<DfRecordBatch, E>>,
125    E: std::error::Error + Send + Sync + 'static,
126{
127    type Item = DfResult<DfRecordBatch>;
128
129    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
130        let this = self.project();
131
132        let batch = futures::ready!(this.stream.poll_next(cx))
133            .map(|r| r.map_err(|e| DataFusionError::External(Box::new(e))));
134
135        let projected_schema = this.projected_schema.clone();
136        let projection = this.projection.clone();
137        let predicate = this.predicate.clone();
138
139        let batch = batch.map(|b| {
140            b.and_then(|b| {
141                let projected_column = b.project(&projection)?;
142                if projected_column.schema().fields.len() != projected_schema.fields.len() {
143                   return Err(DataFusionError::ArrowError(Box::new(ArrowError::SchemaError(format!(
144                        "Trying to cast a RecordBatch into an incompatible schema. RecordBatch: {}, Target: {}",
145                        projected_column.schema(),
146                        projected_schema,
147                    ))), None));
148                }
149
150                let mut columns = Vec::with_capacity(projected_schema.fields.len());
151                for (idx,field) in projected_schema.fields.iter().enumerate() {
152                    let column = projected_column.column(idx);
153                    let extype = field.metadata().get("greptime:type").and_then(|s| ColumnExtType::from_str(s).ok());
154                    let output = custom_cast(&column, field.data_type(), extype)?;
155                    columns.push(output)
156                }
157                let record_batch = DfRecordBatch::try_new(projected_schema, columns)?;
158                let record_batch = if let Some(predicate) = predicate {
159                    batch_filter(&record_batch, &predicate)?
160                } else {
161                    record_batch
162                };
163                Ok(record_batch)
164            })
165        });
166
167        Poll::Ready(batch)
168    }
169
170    #[inline]
171    fn size_hint(&self) -> (usize, Option<usize>) {
172        self.stream.size_hint()
173    }
174}
175
176/// Greptime SendableRecordBatchStream -> DataFusion RecordBatchStream.
177/// The reverse one is [RecordBatchStreamAdapter].
178pub struct DfRecordBatchStreamAdapter {
179    stream: SendableRecordBatchStream,
180}
181
182impl DfRecordBatchStreamAdapter {
183    pub fn new(stream: SendableRecordBatchStream) -> Self {
184        Self { stream }
185    }
186}
187
188impl DfRecordBatchStream for DfRecordBatchStreamAdapter {
189    fn schema(&self) -> DfSchemaRef {
190        self.stream.schema().arrow_schema().clone()
191    }
192}
193
194impl Stream for DfRecordBatchStreamAdapter {
195    type Item = DfResult<DfRecordBatch>;
196
197    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
198        match Pin::new(&mut self.stream).poll_next(cx) {
199            Poll::Pending => Poll::Pending,
200            Poll::Ready(Some(recordbatch)) => match recordbatch {
201                Ok(recordbatch) => Poll::Ready(Some(Ok(recordbatch.into_df_record_batch()))),
202                Err(e) => Poll::Ready(Some(Err(DataFusionError::External(Box::new(e))))),
203            },
204            Poll::Ready(None) => Poll::Ready(None),
205        }
206    }
207
208    #[inline]
209    fn size_hint(&self) -> (usize, Option<usize>) {
210        self.stream.size_hint()
211    }
212}
213
214/// DataFusion [SendableRecordBatchStream](DfSendableRecordBatchStream) -> Greptime [RecordBatchStream].
215/// The reverse one is [DfRecordBatchStreamAdapter].
216/// It can collect metrics from DataFusion execution plan.
217pub struct RecordBatchStreamAdapter {
218    schema: SchemaRef,
219    stream: DfSendableRecordBatchStream,
220    metrics: Option<BaselineMetrics>,
221    /// Aggregated plan-level metrics. Resolved after an [ExecutionPlan] is finished.
222    metrics_2: Metrics,
223    query_load_region_id: Option<u64>,
224    query_stat_counters: Option<RegionQueryStatCounters>,
225    /// Display plan and metrics in verbose mode.
226    explain_verbose: bool,
227    span: Span,
228}
229
230/// Query statistic counters owned by a region.
231#[derive(Debug, Clone)]
232pub struct RegionQueryStatCounters {
233    /// The total query CPU time in nanoseconds.
234    pub query_cpu_time: Arc<AtomicU64>,
235    /// The total scanned bytes.
236    pub query_scanned_bytes: Arc<AtomicU64>,
237}
238
239/// Json encoded metrics. Contains metric from a whole plan tree.
240enum Metrics {
241    Unavailable,
242    Unresolved(Arc<dyn ExecutionPlan>),
243    PartialResolved(Arc<dyn ExecutionPlan>, RecordBatchMetrics),
244    Resolved(RecordBatchMetrics),
245}
246
247impl RecordBatchStreamAdapter {
248    pub fn try_new(stream: DfSendableRecordBatchStream) -> Result<Self> {
249        let schema =
250            Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
251        Ok(Self {
252            schema,
253            stream,
254            metrics: None,
255            metrics_2: Metrics::Unavailable,
256            query_load_region_id: None,
257            query_stat_counters: None,
258            explain_verbose: false,
259            span: Span::current(),
260        })
261    }
262
263    pub fn try_new_with_span(stream: DfSendableRecordBatchStream, span: Span) -> Result<Self> {
264        let schema =
265            Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
266        let subspan = info_span!(parent: &span, "RecordBatchStreamAdapter");
267        Ok(Self {
268            schema,
269            stream,
270            metrics: None,
271            metrics_2: Metrics::Unavailable,
272            query_load_region_id: None,
273            query_stat_counters: None,
274            explain_verbose: false,
275            span: subspan,
276        })
277    }
278
279    pub fn set_metrics2(&mut self, plan: Arc<dyn ExecutionPlan>) {
280        self.metrics_2 = Metrics::Unresolved(plan)
281    }
282
283    fn record_query_stats_on_drop(&self) {
284        let Some(counters) = &self.query_stat_counters else {
285            return;
286        };
287
288        match &self.metrics_2 {
289            Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) => {
290                let mut metric_collector = MetricCollector::new(self.explain_verbose);
291                accept(df_plan.as_ref(), &mut metric_collector).unwrap();
292                metric_collector.record_batch_metrics.query_load_region_id =
293                    self.query_load_region_id;
294                record_query_stats(counters, &metric_collector.record_batch_metrics);
295            }
296            Metrics::Resolved(metrics) => record_query_stats(counters, metrics),
297            Metrics::Unavailable => {}
298        }
299    }
300
301    pub fn set_query_load_region_id(&mut self, region_id: Option<u64>) {
302        self.query_load_region_id = region_id;
303    }
304
305    pub fn set_query_stat_counters(&mut self, counters: Option<RegionQueryStatCounters>) {
306        self.query_stat_counters = counters;
307    }
308
309    /// Set the verbose mode for displaying plan and metrics.
310    pub fn set_explain_verbose(&mut self, verbose: bool) {
311        self.explain_verbose = verbose;
312    }
313}
314
315/// Extracts total `output_bytes` from region scan plan nodes.
316pub fn region_scan_output_bytes(metrics: &RecordBatchMetrics) -> usize {
317    metrics
318        .plan_metrics
319        .iter()
320        .filter(|pm| pm.plan_name == REGION_SCAN_EXEC_NAME)
321        .flat_map(|pm| &pm.metrics)
322        .filter_map(|(name, value)| (name == "output_bytes").then_some(*value))
323        .sum()
324}
325
326fn record_query_stats(counters: &RegionQueryStatCounters, metrics: &RecordBatchMetrics) {
327    counters
328        .query_cpu_time
329        .fetch_add(metrics.elapsed_compute as u64, Ordering::Relaxed);
330    counters
331        .query_scanned_bytes
332        .fetch_add(region_scan_output_bytes(metrics) as u64, Ordering::Relaxed);
333}
334
335impl RecordBatchStream for RecordBatchStreamAdapter {
336    fn name(&self) -> &str {
337        "RecordBatchStreamAdapter"
338    }
339
340    fn schema(&self) -> SchemaRef {
341        self.schema.clone()
342    }
343
344    fn metrics(&self) -> Option<RecordBatchMetrics> {
345        match &self.metrics_2 {
346            Metrics::Resolved(metrics) | Metrics::PartialResolved(_, metrics) => {
347                Some(metrics.clone())
348            }
349            Metrics::Unavailable | Metrics::Unresolved(_) => None,
350        }
351    }
352
353    fn output_ordering(&self) -> Option<&[OrderOption]> {
354        None
355    }
356}
357
358impl Stream for RecordBatchStreamAdapter {
359    type Item = Result<RecordBatch>;
360
361    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
362        let timer = self
363            .metrics
364            .as_ref()
365            .map(|m| m.elapsed_compute().clone())
366            .unwrap_or_default();
367        let _guard = timer.timer();
368        let poll_span = info_span!(parent: &self.span, "poll_next");
369        let _entered = poll_span.enter();
370        match Pin::new(&mut self.stream).poll_next(cx) {
371            Poll::Pending => Poll::Pending,
372            Poll::Ready(Some(df_record_batch)) => {
373                let df_record_batch = df_record_batch?;
374                if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) =
375                    &self.metrics_2
376                {
377                    let mut metric_collector = MetricCollector::new(self.explain_verbose);
378                    accept(df_plan.as_ref(), &mut metric_collector).unwrap();
379                    metric_collector.record_batch_metrics.query_load_region_id =
380                        self.query_load_region_id;
381                    self.metrics_2 = Metrics::PartialResolved(
382                        df_plan.clone(),
383                        metric_collector.record_batch_metrics,
384                    );
385                }
386                Poll::Ready(Some(Ok(RecordBatch::from_df_record_batch(
387                    self.schema(),
388                    df_record_batch,
389                ))))
390            }
391            Poll::Ready(None) => {
392                if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) =
393                    &self.metrics_2
394                {
395                    let mut metric_collector = MetricCollector::new(self.explain_verbose);
396                    accept(df_plan.as_ref(), &mut metric_collector).unwrap();
397                    metric_collector.record_batch_metrics.query_load_region_id =
398                        self.query_load_region_id;
399                    self.metrics_2 = Metrics::Resolved(metric_collector.record_batch_metrics);
400                }
401                Poll::Ready(None)
402            }
403        }
404    }
405
406    #[inline]
407    fn size_hint(&self) -> (usize, Option<usize>) {
408        self.stream.size_hint()
409    }
410}
411
412impl Drop for RecordBatchStreamAdapter {
413    fn drop(&mut self) {
414        self.record_query_stats_on_drop();
415    }
416}
417
418/// An [ExecutionPlanVisitor] to collect metrics from a [ExecutionPlan].
419pub struct MetricCollector {
420    current_level: usize,
421    pub record_batch_metrics: RecordBatchMetrics,
422    verbose: bool,
423}
424
425impl MetricCollector {
426    pub fn new(verbose: bool) -> Self {
427        Self {
428            current_level: 0,
429            record_batch_metrics: RecordBatchMetrics::default(),
430            verbose,
431        }
432    }
433}
434
435impl ExecutionPlanVisitor for MetricCollector {
436    type Error = !;
437
438    fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> std::result::Result<bool, Self::Error> {
439        // skip if no metric available
440        let Some(metric) = plan.metrics() else {
441            self.record_batch_metrics.plan_metrics.push(PlanMetrics {
442                plan: plan.name().to_string(),
443                plan_name: plan.name().to_string(),
444                level: self.current_level,
445                metrics: vec![],
446            });
447            self.current_level += 1;
448            return Ok(true);
449        };
450
451        // scrape plan metrics
452        let metric = metric
453            .aggregate_by_name()
454            .sorted_for_display()
455            .timestamps_removed();
456        let mut plan_metric = PlanMetrics {
457            plan: one_line(plan, self.verbose).to_string(),
458            plan_name: plan.name().to_string(),
459            level: self.current_level,
460            metrics: Vec::with_capacity(metric.iter().size_hint().0),
461        };
462        for m in metric.iter() {
463            plan_metric
464                .metrics
465                .push((m.value().name().to_string(), m.value().as_usize()));
466
467            // aggregate high-level metrics
468            match m.value() {
469                MetricValue::ElapsedCompute(ec) => {
470                    self.record_batch_metrics.elapsed_compute += ec.value()
471                }
472                MetricValue::CurrentMemoryUsage(m) => {
473                    self.record_batch_metrics.memory_usage += m.value()
474                }
475                _ => {}
476            }
477        }
478        self.record_batch_metrics.plan_metrics.push(plan_metric);
479
480        self.current_level += 1;
481        Ok(true)
482    }
483
484    fn post_visit(&mut self, _plan: &dyn ExecutionPlan) -> std::result::Result<bool, Self::Error> {
485        self.current_level -= 1;
486        Ok(true)
487    }
488}
489
490/// Returns a single-line summary of the root of the plan.
491/// If the `verbose` flag is set, it will display detailed information about the plan.
492fn one_line(plan: &dyn ExecutionPlan, verbose: bool) -> impl fmt::Display + '_ {
493    struct Wrapper<'a> {
494        plan: &'a dyn ExecutionPlan,
495        format_type: DisplayFormatType,
496    }
497
498    impl fmt::Display for Wrapper<'_> {
499        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
500            self.plan.fmt_as(self.format_type, f)?;
501            writeln!(f)
502        }
503    }
504
505    let format_type = if verbose {
506        DisplayFormatType::Verbose
507    } else {
508        DisplayFormatType::Default
509    };
510    Wrapper { plan, format_type }
511}
512
513/// [`RecordBatchMetrics`] carrys metrics value
514/// from datanode to frontend through gRPC
515#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone)]
516pub struct RecordBatchMetrics {
517    // High-level aggregated metrics
518    /// CPU consumption in nanoseconds
519    pub elapsed_compute: usize,
520    /// Memory used by the plan in bytes
521    pub memory_usage: usize,
522    // Detailed per-plan metrics
523    /// An ordered list of plan metrics, from top to bottom in post-order.
524    pub plan_metrics: Vec<PlanMetrics>,
525    /// Region id that should receive query-load metrics for this scan.
526    #[serde(default, skip_serializing_if = "Option::is_none")]
527    pub query_load_region_id: Option<u64>,
528    /// Per-region watermark for incremental-read checkpoint advancement.
529    ///
530    /// The watermark is the latest sequence (`seq`) this query round safely read
531    /// for each participating region. Flow uses it to decide where the next
532    /// incremental round can resume.
533    ///
534    /// - `Some(seq)`: the query proved it safely read up to `seq`; downstream
535    ///   may advance the checkpoint to this value.
536    /// - `None`: the region participated but the query could not prove a safe
537    ///   read upper-bound, so the checkpoint must not advance for this region.
538    ///
539    /// Omitted when empty for backward compatibility.
540    #[serde(default, skip_serializing_if = "Vec::is_empty")]
541    pub region_watermarks: Vec<RegionWatermarkEntry>,
542}
543
544#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
545pub struct RegionWatermarkEntry {
546    pub region_id: u64,
547    #[serde(default, skip_serializing_if = "Option::is_none")]
548    pub watermark: Option<u64>,
549}
550
551/// Determines if a metric name represents a time measurement that should be formatted.
552fn is_time_metric(metric_name: &str) -> bool {
553    metric_name.contains("elapsed") || metric_name.contains("time") || metric_name.contains("cost")
554}
555
556/// Determines if a metric name represents a bytes measurement that should be formatted.
557fn is_bytes_metric(metric_name: &str) -> bool {
558    metric_name.contains("bytes") || metric_name.contains("mem")
559}
560
561fn format_bytes_human_readable(bytes: usize) -> String {
562    format!("{}", ReadableSize(bytes as u64))
563}
564
565/// Only display `plan_metrics` with indent `  ` (2 spaces).
566impl Display for RecordBatchMetrics {
567    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
568        for metric in &self.plan_metrics {
569            write!(
570                f,
571                "{:indent$}{} metrics=[",
572                " ",
573                metric.plan.trim_end(),
574                indent = metric.level * 2,
575            )?;
576            for (label, value) in &metric.metrics {
577                if is_time_metric(label) {
578                    write!(
579                        f,
580                        "{}: {}, ",
581                        label,
582                        format_nanoseconds_human_readable(*value),
583                    )?;
584                } else if is_bytes_metric(label) {
585                    write!(f, "{}: {}, ", label, format_bytes_human_readable(*value),)?;
586                } else {
587                    write!(f, "{}: {}, ", label, value)?;
588                }
589            }
590            writeln!(f, "]")?;
591        }
592
593        Ok(())
594    }
595}
596
597#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone)]
598pub struct PlanMetrics {
599    /// The plan name
600    pub plan: String,
601    /// The stable execution plan name.
602    #[serde(default)]
603    pub plan_name: String,
604    /// The level of the plan, starts from 0
605    pub level: usize,
606    /// An ordered key-value list of metrics.
607    /// Key is metric label and value is metric value.
608    pub metrics: Vec<(String, usize)>,
609}
610
611enum AsyncRecordBatchStreamAdapterState {
612    Uninit(FutureStream),
613    Ready(SendableRecordBatchStream),
614    Failed,
615}
616
617pub struct AsyncRecordBatchStreamAdapter {
618    schema: SchemaRef,
619    state: AsyncRecordBatchStreamAdapterState,
620}
621
622impl AsyncRecordBatchStreamAdapter {
623    pub fn new(schema: SchemaRef, stream: FutureStream) -> Self {
624        Self {
625            schema,
626            state: AsyncRecordBatchStreamAdapterState::Uninit(stream),
627        }
628    }
629}
630
631impl RecordBatchStream for AsyncRecordBatchStreamAdapter {
632    fn schema(&self) -> SchemaRef {
633        self.schema.clone()
634    }
635
636    fn output_ordering(&self) -> Option<&[OrderOption]> {
637        None
638    }
639
640    fn metrics(&self) -> Option<RecordBatchMetrics> {
641        None
642    }
643}
644
645impl Stream for AsyncRecordBatchStreamAdapter {
646    type Item = Result<RecordBatch>;
647
648    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
649        loop {
650            match &mut self.state {
651                AsyncRecordBatchStreamAdapterState::Uninit(stream_future) => {
652                    match ready!(Pin::new(stream_future).poll(cx)) {
653                        Ok(stream) => {
654                            self.state = AsyncRecordBatchStreamAdapterState::Ready(stream);
655                            continue;
656                        }
657                        Err(e) => {
658                            self.state = AsyncRecordBatchStreamAdapterState::Failed;
659                            return Poll::Ready(Some(Err(e)));
660                        }
661                    };
662                }
663                AsyncRecordBatchStreamAdapterState::Ready(stream) => {
664                    return Poll::Ready(ready!(Pin::new(stream).poll_next(cx)));
665                }
666                AsyncRecordBatchStreamAdapterState::Failed => return Poll::Ready(None),
667            }
668        }
669    }
670
671    // This is not supported for lazy stream.
672    #[inline]
673    fn size_hint(&self) -> (usize, Option<usize>) {
674        (0, None)
675    }
676}
677
678/// Custom cast function that handles Map -> Binary (JSON) conversion
679fn custom_cast(
680    array: &dyn Array,
681    target_type: &ArrowDataType,
682    extype: Option<ColumnExtType>,
683) -> std::result::Result<Arc<dyn Array>, ArrowError> {
684    if let ArrowDataType::Map(_, _) = array.data_type()
685        && let ArrowDataType::Binary = target_type
686    {
687        return convert_map_to_json_binary(array, extype);
688    }
689
690    cast(array, target_type)
691}
692
693/// Convert a Map array to a Binary array containing JSON data
694fn convert_map_to_json_binary(
695    array: &dyn Array,
696    extype: Option<ColumnExtType>,
697) -> std::result::Result<Arc<dyn Array>, ArrowError> {
698    use datatypes::arrow::array::{BinaryArray, MapArray};
699    use serde_json::Value;
700
701    let map_array = array
702        .as_any()
703        .downcast_ref::<MapArray>()
704        .ok_or_else(|| ArrowError::CastError("Failed to downcast to MapArray".to_string()))?;
705
706    let mut json_values = Vec::with_capacity(map_array.len());
707
708    for i in 0..map_array.len() {
709        if map_array.is_null(i) {
710            json_values.push(None);
711        } else {
712            // Extract the map entry at index i
713            let map_entry = map_array.value(i);
714            let key_value_array = map_entry
715                .as_any()
716                .downcast_ref::<datatypes::arrow::array::StructArray>()
717                .ok_or_else(|| {
718                    ArrowError::CastError("Failed to downcast to StructArray".to_string())
719                })?;
720
721            // Convert to JSON object
722            let mut json_obj = serde_json::Map::with_capacity(key_value_array.len());
723
724            for j in 0..key_value_array.len() {
725                if key_value_array.is_null(j) {
726                    continue;
727                }
728                let key_field = key_value_array.column(0);
729                let value_field = key_value_array.column(1);
730
731                if key_field.is_null(j) {
732                    continue;
733                }
734
735                let key = key_field
736                    .as_any()
737                    .downcast_ref::<datatypes::arrow::array::StringArray>()
738                    .ok_or_else(|| {
739                        ArrowError::CastError("Failed to downcast key to StringArray".to_string())
740                    })?
741                    .value(j);
742
743                let value = if value_field.is_null(j) {
744                    Value::Null
745                } else {
746                    let value_str = value_field
747                        .as_any()
748                        .downcast_ref::<datatypes::arrow::array::StringArray>()
749                        .ok_or_else(|| {
750                            ArrowError::CastError(
751                                "Failed to downcast value to StringArray".to_string(),
752                            )
753                        })?
754                        .value(j);
755                    Value::String(value_str.to_string())
756                };
757
758                json_obj.insert(key.to_string(), value);
759            }
760
761            let json_value = Value::Object(json_obj);
762            let json_bytes = match extype {
763                Some(ColumnExtType::Json) => {
764                    let json_string = match serde_json::to_string(&json_value) {
765                        Ok(s) => s,
766                        Err(e) => {
767                            return Err(ArrowError::CastError(format!(
768                                "Failed to serialize JSON: {}",
769                                e
770                            )));
771                        }
772                    };
773                    match jsonb::parse_value(json_string.as_bytes()) {
774                        Ok(jsonb_value) => jsonb_value.to_vec(),
775                        Err(e) => {
776                            return Err(ArrowError::CastError(format!(
777                                "Failed to serialize JSONB: {}",
778                                e
779                            )));
780                        }
781                    }
782                }
783                _ => match serde_json::to_vec(&json_value) {
784                    Ok(b) => b,
785                    Err(e) => {
786                        return Err(ArrowError::CastError(format!(
787                            "Failed to serialize JSON: {}",
788                            e
789                        )));
790                    }
791                },
792            };
793            json_values.push(Some(json_bytes));
794        }
795    }
796
797    let binary_array = BinaryArray::from_iter(json_values);
798    Ok(Arc::new(binary_array))
799}
800
801#[cfg(test)]
802mod test {
803    use common_error::ext::BoxedError;
804    use common_error::mock::MockError;
805    use common_error::status_code::StatusCode;
806    use datatypes::arrow::array::{ArrayRef, MapArray, StringArray, StructArray};
807    use datatypes::arrow::buffer::OffsetBuffer;
808    use datatypes::arrow::datatypes::Field;
809    use datatypes::prelude::ConcreteDataType;
810    use datatypes::schema::ColumnSchema;
811    use datatypes::vectors::Int32Vector;
812    use serde_json::json;
813    use snafu::IntoError;
814
815    use super::*;
816    use crate::RecordBatches;
817    use crate::error::Error;
818
819    #[tokio::test]
820    async fn test_async_recordbatch_stream_adaptor() {
821        struct MaybeErrorRecordBatchStream {
822            items: Vec<Result<RecordBatch>>,
823        }
824
825        impl RecordBatchStream for MaybeErrorRecordBatchStream {
826            fn schema(&self) -> SchemaRef {
827                unimplemented!()
828            }
829
830            fn output_ordering(&self) -> Option<&[OrderOption]> {
831                None
832            }
833
834            fn metrics(&self) -> Option<RecordBatchMetrics> {
835                None
836            }
837        }
838
839        impl Stream for MaybeErrorRecordBatchStream {
840            type Item = Result<RecordBatch>;
841
842            fn poll_next(
843                mut self: Pin<&mut Self>,
844                _: &mut Context<'_>,
845            ) -> Poll<Option<Self::Item>> {
846                if let Some(batch) = self.items.pop() {
847                    Poll::Ready(Some(Ok(batch?)))
848                } else {
849                    Poll::Ready(None)
850                }
851            }
852        }
853
854        fn new_future_stream(
855            maybe_recordbatches: Result<Vec<Result<RecordBatch>>>,
856        ) -> FutureStream {
857            Box::pin(async move {
858                maybe_recordbatches
859                    .map(|items| Box::pin(MaybeErrorRecordBatchStream { items }) as _)
860            })
861        }
862
863        let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
864            "a",
865            ConcreteDataType::int32_datatype(),
866            false,
867        )]));
868        let batch1 = RecordBatch::new(
869            schema.clone(),
870            vec![Arc::new(Int32Vector::from_slice([1])) as _],
871        )
872        .unwrap();
873        let batch2 = RecordBatch::new(
874            schema.clone(),
875            vec![Arc::new(Int32Vector::from_slice([2])) as _],
876        )
877        .unwrap();
878
879        let success_stream = new_future_stream(Ok(vec![Ok(batch1.clone()), Ok(batch2.clone())]));
880        let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), success_stream);
881        let collected = RecordBatches::try_collect(Box::pin(adapter)).await.unwrap();
882        assert_eq!(
883            collected,
884            RecordBatches::try_new(schema.clone(), vec![batch2.clone(), batch1.clone()]).unwrap()
885        );
886
887        let poll_err_stream = new_future_stream(Ok(vec![
888            Ok(batch1.clone()),
889            Err(error::ExternalSnafu
890                .into_error(BoxedError::new(MockError::new(StatusCode::Unknown)))),
891        ]));
892        let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), poll_err_stream);
893        let err = RecordBatches::try_collect(Box::pin(adapter))
894            .await
895            .unwrap_err();
896        assert!(
897            matches!(err, Error::External { .. }),
898            "unexpected err {err}"
899        );
900
901        let failed_to_init_stream =
902            new_future_stream(Err(error::ExternalSnafu
903                .into_error(BoxedError::new(MockError::new(StatusCode::Internal)))));
904        let adapter = AsyncRecordBatchStreamAdapter::new(schema.clone(), failed_to_init_stream);
905        let err = RecordBatches::try_collect(Box::pin(adapter))
906            .await
907            .unwrap_err();
908        assert!(
909            matches!(err, Error::External { .. }),
910            "unexpected err {err}"
911        );
912    }
913
914    #[test]
915    fn test_convert_map_to_json_binary() {
916        let keys = StringArray::from(vec![Some("a"), Some("b"), Some("c"), Some("x")]);
917        let values = StringArray::from(vec![Some("1"), None, Some("3"), Some("42")]);
918        let key_field = Arc::new(Field::new("key", ArrowDataType::Utf8, false));
919        let value_field = Arc::new(Field::new("value", ArrowDataType::Utf8, true));
920        let struct_type = ArrowDataType::Struct(vec![key_field, value_field].into());
921
922        let entries_field = Arc::new(Field::new("entries", struct_type, false));
923
924        let struct_array = StructArray::from(vec![
925            (
926                Arc::new(Field::new("key", ArrowDataType::Utf8, false)),
927                Arc::new(keys) as ArrayRef,
928            ),
929            (
930                Arc::new(Field::new("value", ArrowDataType::Utf8, true)),
931                Arc::new(values) as ArrayRef,
932            ),
933        ]);
934
935        let offsets = OffsetBuffer::from_lengths([3, 0, 1]);
936        let nulls = datatypes::arrow::buffer::NullBuffer::from(vec![true, false, true]);
937
938        let map_array = MapArray::new(
939            entries_field,
940            offsets,
941            struct_array,
942            Some(nulls), // nulls
943            false,
944        );
945
946        let result = convert_map_to_json_binary(&map_array, None).unwrap();
947        let binary_array = result
948            .as_any()
949            .downcast_ref::<datatypes::arrow::array::BinaryArray>()
950            .unwrap();
951
952        let expected_jsons = [
953            Some(r#"{"a":"1","b":null,"c":"3"}"#),
954            None,
955            Some(r#"{"x":"42"}"#),
956        ];
957
958        for (i, _) in expected_jsons.iter().enumerate() {
959            if let Some(expected) = &expected_jsons[i] {
960                assert!(!binary_array.is_null(i));
961                let actual_bytes = binary_array.value(i);
962                let actual_str = std::str::from_utf8(actual_bytes).unwrap();
963                assert_eq!(actual_str, *expected);
964            } else {
965                assert!(binary_array.is_null(i));
966            }
967        }
968
969        let result_json =
970            convert_map_to_json_binary(&map_array, Some(ColumnExtType::Json)).unwrap();
971        let binary_array_json = result_json
972            .as_any()
973            .downcast_ref::<datatypes::arrow::array::BinaryArray>()
974            .unwrap();
975
976        for (i, _) in expected_jsons.iter().enumerate() {
977            if expected_jsons[i].is_some() {
978                assert!(!binary_array_json.is_null(i));
979                let actual_bytes = binary_array_json.value(i);
980                assert_ne!(actual_bytes, expected_jsons[i].unwrap().as_bytes());
981            } else {
982                assert!(binary_array_json.is_null(i));
983            }
984        }
985    }
986
987    #[test]
988    fn test_record_query_stats_updates_region_counters() {
989        let counters = RegionQueryStatCounters {
990            query_cpu_time: Arc::new(AtomicU64::new(10)),
991            query_scanned_bytes: Arc::new(AtomicU64::new(20)),
992        };
993        let metrics = RecordBatchMetrics {
994            elapsed_compute: 2_000_000,
995            plan_metrics: vec![PlanMetrics {
996                plan: "RegionScanExec: region=1".to_string(),
997                plan_name: REGION_SCAN_EXEC_NAME.to_string(),
998                level: 0,
999                metrics: vec![("output_bytes".to_string(), 42)],
1000            }],
1001            ..Default::default()
1002        };
1003
1004        record_query_stats(&counters, &metrics);
1005
1006        assert_eq!(counters.query_cpu_time.load(Ordering::Relaxed), 2_000_010);
1007        assert_eq!(counters.query_scanned_bytes.load(Ordering::Relaxed), 62);
1008    }
1009
1010    #[test]
1011    fn test_record_batch_stream_adapter_records_query_stats_on_drop() {
1012        let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
1013            "a",
1014            ConcreteDataType::int32_datatype(),
1015            false,
1016        )]));
1017        let df_stream = Box::pin(
1018            datafusion::physical_plan::stream::RecordBatchStreamAdapter::new(
1019                schema.arrow_schema().clone(),
1020                futures::stream::empty::<datafusion::error::Result<DfRecordBatch>>(),
1021            ),
1022        );
1023        let counters = RegionQueryStatCounters {
1024            query_cpu_time: Arc::new(AtomicU64::new(10)),
1025            query_scanned_bytes: Arc::new(AtomicU64::new(20)),
1026        };
1027        let metrics = RecordBatchMetrics {
1028            elapsed_compute: 2_000_000,
1029            plan_metrics: vec![PlanMetrics {
1030                plan: "RegionScanExec: region=1".to_string(),
1031                plan_name: REGION_SCAN_EXEC_NAME.to_string(),
1032                level: 0,
1033                metrics: vec![("output_bytes".to_string(), 42)],
1034            }],
1035            ..Default::default()
1036        };
1037        let adapter = RecordBatchStreamAdapter {
1038            schema,
1039            stream: df_stream,
1040            metrics: None,
1041            metrics_2: Metrics::Resolved(metrics),
1042            query_load_region_id: None,
1043            query_stat_counters: Some(counters.clone()),
1044            explain_verbose: false,
1045            span: Span::current(),
1046        };
1047
1048        drop(adapter);
1049
1050        assert_eq!(counters.query_cpu_time.load(Ordering::Relaxed), 2_000_010);
1051        assert_eq!(counters.query_scanned_bytes.load(Ordering::Relaxed), 62);
1052    }
1053
1054    #[test]
1055    fn test_recordbatch_metrics_deserializes_without_region_watermarks() {
1056        let metrics: RecordBatchMetrics = serde_json::from_value(json!({
1057            "elapsed_compute": 12,
1058            "memory_usage": 34,
1059            "plan_metrics": []
1060        }))
1061        .unwrap();
1062
1063        assert!(metrics.region_watermarks.is_empty());
1064        assert_eq!(metrics.elapsed_compute, 12);
1065        assert_eq!(metrics.memory_usage, 34);
1066    }
1067
1068    #[test]
1069    fn test_plan_metrics_deserializes_without_plan_name() {
1070        let metrics: RecordBatchMetrics = serde_json::from_value(json!({
1071            "elapsed_compute": 12,
1072            "memory_usage": 34,
1073            "plan_metrics": [{
1074                "plan": "SeqScan: region=1",
1075                "level": 0,
1076                "metrics": []
1077            }]
1078        }))
1079        .unwrap();
1080
1081        assert_eq!(metrics.plan_metrics[0].plan_name, "");
1082    }
1083
1084    #[test]
1085    fn test_recordbatch_metrics_region_watermarks_serde_roundtrip() {
1086        let metrics = RecordBatchMetrics {
1087            region_watermarks: vec![
1088                RegionWatermarkEntry {
1089                    region_id: 1,
1090                    watermark: Some(100),
1091                },
1092                RegionWatermarkEntry {
1093                    region_id: 2,
1094                    watermark: None,
1095                },
1096            ],
1097            ..Default::default()
1098        };
1099
1100        let value = serde_json::to_value(&metrics).unwrap();
1101        assert_eq!(
1102            value.get("region_watermarks").unwrap(),
1103            &json!([
1104                { "region_id": 1, "watermark": 100 },
1105                { "region_id": 2 }
1106            ])
1107        );
1108
1109        let decoded: RecordBatchMetrics = serde_json::from_value(value).unwrap();
1110        assert_eq!(decoded.region_watermarks, metrics.region_watermarks);
1111    }
1112
1113    #[test]
1114    fn test_recordbatch_metrics_skips_empty_region_watermarks_on_serialize() {
1115        let value = serde_json::to_value(RecordBatchMetrics::default()).unwrap();
1116        assert!(value.get("region_watermarks").is_none());
1117    }
1118}