promql/extension_plan/
scalar_calculate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use datafusion::common::stats::Precision;
22use datafusion::common::{DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics};
23use datafusion::error::DataFusionError;
24use datafusion::execution::context::TaskContext;
25use datafusion::logical_expr::{EmptyRelation, LogicalPlan, UserDefinedLogicalNodeCore};
26use datafusion::physical_expr::EquivalenceProperties;
27use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
28use datafusion::physical_plan::{
29    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties,
30    RecordBatchStream, SendableRecordBatchStream,
31};
32use datafusion::prelude::Expr;
33use datafusion::sql::TableReference;
34use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray};
35use datatypes::arrow::compute::{CastOptions, cast_with_options, concat_batches};
36use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
37use datatypes::arrow::record_batch::RecordBatch;
38use futures::{Stream, StreamExt, ready};
39use greptime_proto::substrait_extension as pb;
40use prost::Message;
41use snafu::ResultExt;
42
43use crate::error::{ColumnNotFoundSnafu, DataFusionPlanningSnafu, DeserializeSnafu, Result};
44use crate::extension_plan::{Millisecond, resolve_column_name, serialize_column_index};
45
46/// `ScalarCalculate` is the custom logical plan to calculate
47/// [`scalar`](https://prometheus.io/docs/prometheus/latest/querying/functions/#scalar)
48/// in PromQL, return NaN when have multiple time series.
49///
50/// Return the time series as scalar value when only have one time series.
51#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52pub struct ScalarCalculate {
53    start: Millisecond,
54    end: Millisecond,
55    interval: Millisecond,
56
57    time_index: String,
58    tag_columns: Vec<String>,
59    field_column: String,
60    input: LogicalPlan,
61    output_schema: DFSchemaRef,
62    unfix: Option<UnfixIndices>,
63}
64
65#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
66struct UnfixIndices {
67    pub time_index_idx: u64,
68    pub tag_column_indices: Vec<u64>,
69    pub field_column_idx: u64,
70}
71
72impl ScalarCalculate {
73    /// create a new `ScalarCalculate` plan
74    #[allow(clippy::too_many_arguments)]
75    pub fn new(
76        start: Millisecond,
77        end: Millisecond,
78        interval: Millisecond,
79        input: LogicalPlan,
80        time_index: &str,
81        tag_columns: &[String],
82        field_column: &str,
83        table_name: Option<&str>,
84    ) -> Result<Self> {
85        let input_schema = input.schema();
86        let Ok(ts_field) = input_schema
87            .field_with_unqualified_name(time_index)
88            .cloned()
89        else {
90            return ColumnNotFoundSnafu { col: time_index }.fail();
91        };
92        let val_field = Field::new(format!("scalar({})", field_column), DataType::Float64, true);
93        let qualifier = table_name.map(TableReference::bare);
94        let schema = DFSchema::new_with_metadata(
95            vec![
96                (qualifier.clone(), Arc::new(ts_field)),
97                (qualifier, Arc::new(val_field)),
98            ],
99            input_schema.metadata().clone(),
100        )
101        .context(DataFusionPlanningSnafu)?;
102
103        Ok(Self {
104            start,
105            end,
106            interval,
107            time_index: time_index.to_string(),
108            tag_columns: tag_columns.to_vec(),
109            field_column: field_column.to_string(),
110            input,
111            output_schema: Arc::new(schema),
112            unfix: None,
113        })
114    }
115
116    /// The name of this custom plan
117    pub const fn name() -> &'static str {
118        "ScalarCalculate"
119    }
120
121    /// Create a new execution plan from ScalarCalculate
122    pub fn to_execution_plan(
123        &self,
124        exec_input: Arc<dyn ExecutionPlan>,
125    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
126        let fields: Vec<_> = self
127            .output_schema
128            .fields()
129            .iter()
130            .map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable()))
131            .collect();
132        let input_schema = exec_input.schema();
133        let ts_index = input_schema
134            .index_of(&self.time_index)
135            .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
136        let val_index = input_schema
137            .index_of(&self.field_column)
138            .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
139        let schema = Arc::new(Schema::new(fields));
140        let properties = exec_input.properties();
141        let properties = PlanProperties::new(
142            EquivalenceProperties::new(schema.clone()),
143            Partitioning::UnknownPartitioning(1),
144            properties.emission_type,
145            properties.boundedness,
146        );
147        Ok(Arc::new(ScalarCalculateExec {
148            start: self.start,
149            end: self.end,
150            interval: self.interval,
151            schema,
152            input: exec_input,
153            project_index: (ts_index, val_index),
154            tag_columns: self.tag_columns.clone(),
155            metric: ExecutionPlanMetricsSet::new(),
156            properties,
157        }))
158    }
159
160    pub fn serialize(&self) -> Vec<u8> {
161        let time_index_idx = serialize_column_index(self.input.schema(), &self.time_index);
162
163        let tag_column_indices = self
164            .tag_columns
165            .iter()
166            .map(|name| serialize_column_index(self.input.schema(), name))
167            .collect::<Vec<u64>>();
168
169        let field_column_idx = serialize_column_index(self.input.schema(), &self.field_column);
170
171        pb::ScalarCalculate {
172            start: self.start,
173            end: self.end,
174            interval: self.interval,
175            time_index_idx,
176            tag_column_indices,
177            field_column_idx,
178            ..Default::default()
179        }
180        .encode_to_vec()
181    }
182
183    pub fn deserialize(bytes: &[u8]) -> Result<Self> {
184        let pb_scalar_calculate = pb::ScalarCalculate::decode(bytes).context(DeserializeSnafu)?;
185        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
186            produce_one_row: false,
187            schema: Arc::new(DFSchema::empty()),
188        });
189
190        let unfix = UnfixIndices {
191            time_index_idx: pb_scalar_calculate.time_index_idx,
192            tag_column_indices: pb_scalar_calculate.tag_column_indices.clone(),
193            field_column_idx: pb_scalar_calculate.field_column_idx,
194        };
195
196        // TODO(Taylor-lagrange): Supports timestamps of different precisions
197        let ts_field = Field::new(
198            "placeholder_time_index",
199            DataType::Timestamp(TimeUnit::Millisecond, None),
200            true,
201        );
202        let val_field = Field::new("placeholder_field", DataType::Float64, true);
203        // TODO(Taylor-lagrange): missing tablename in pb
204        let schema = DFSchema::new_with_metadata(
205            vec![(None, Arc::new(ts_field)), (None, Arc::new(val_field))],
206            HashMap::new(),
207        )
208        .context(DataFusionPlanningSnafu)?;
209
210        Ok(Self {
211            start: pb_scalar_calculate.start,
212            end: pb_scalar_calculate.end,
213            interval: pb_scalar_calculate.interval,
214            time_index: String::new(),
215            tag_columns: Vec::new(),
216            field_column: String::new(),
217            output_schema: Arc::new(schema),
218            input: placeholder_plan,
219            unfix: Some(unfix),
220        })
221    }
222}
223
224impl PartialOrd for ScalarCalculate {
225    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
226        // Compare fields in order excluding output_schema
227        match self.start.partial_cmp(&other.start) {
228            Some(core::cmp::Ordering::Equal) => {}
229            ord => return ord,
230        }
231        match self.end.partial_cmp(&other.end) {
232            Some(core::cmp::Ordering::Equal) => {}
233            ord => return ord,
234        }
235        match self.interval.partial_cmp(&other.interval) {
236            Some(core::cmp::Ordering::Equal) => {}
237            ord => return ord,
238        }
239        match self.time_index.partial_cmp(&other.time_index) {
240            Some(core::cmp::Ordering::Equal) => {}
241            ord => return ord,
242        }
243        match self.tag_columns.partial_cmp(&other.tag_columns) {
244            Some(core::cmp::Ordering::Equal) => {}
245            ord => return ord,
246        }
247        match self.field_column.partial_cmp(&other.field_column) {
248            Some(core::cmp::Ordering::Equal) => {}
249            ord => return ord,
250        }
251        self.input.partial_cmp(&other.input)
252    }
253}
254
255impl UserDefinedLogicalNodeCore for ScalarCalculate {
256    fn name(&self) -> &str {
257        Self::name()
258    }
259
260    fn inputs(&self) -> Vec<&LogicalPlan> {
261        vec![&self.input]
262    }
263
264    fn schema(&self) -> &DFSchemaRef {
265        &self.output_schema
266    }
267
268    fn expressions(&self) -> Vec<Expr> {
269        vec![]
270    }
271
272    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
273        write!(f, "ScalarCalculate: tags={:?}", self.tag_columns)
274    }
275
276    fn with_exprs_and_inputs(
277        &self,
278        exprs: Vec<Expr>,
279        inputs: Vec<LogicalPlan>,
280    ) -> DataFusionResult<Self> {
281        if !exprs.is_empty() {
282            return Err(DataFusionError::Internal(
283                "ScalarCalculate should not have any expressions".to_string(),
284            ));
285        }
286
287        let input: LogicalPlan = inputs.into_iter().next().unwrap();
288        let input_schema = input.schema();
289
290        if let Some(unfix) = &self.unfix {
291            // transform indices to names
292            let time_index = resolve_column_name(
293                unfix.time_index_idx,
294                input_schema,
295                "ScalarCalculate",
296                "time index",
297            )?;
298
299            let tag_columns = unfix
300                .tag_column_indices
301                .iter()
302                .map(|idx| resolve_column_name(*idx, input_schema, "ScalarCalculate", "tag"))
303                .collect::<DataFusionResult<Vec<String>>>()?;
304
305            let field_column = resolve_column_name(
306                unfix.field_column_idx,
307                input_schema,
308                "ScalarCalculate",
309                "field",
310            )?;
311
312            // Recreate output schema with actual field names
313            let ts_field = Field::new(
314                &time_index,
315                DataType::Timestamp(TimeUnit::Millisecond, None),
316                true,
317            );
318            let val_field =
319                Field::new(format!("scalar({})", field_column), DataType::Float64, true);
320            let schema = DFSchema::new_with_metadata(
321                vec![(None, Arc::new(ts_field)), (None, Arc::new(val_field))],
322                HashMap::new(),
323            )
324            .context(DataFusionPlanningSnafu)?;
325
326            Ok(ScalarCalculate {
327                start: self.start,
328                end: self.end,
329                interval: self.interval,
330                time_index,
331                tag_columns,
332                field_column,
333                input,
334                output_schema: Arc::new(schema),
335                unfix: None,
336            })
337        } else {
338            Ok(ScalarCalculate {
339                start: self.start,
340                end: self.end,
341                interval: self.interval,
342                time_index: self.time_index.clone(),
343                tag_columns: self.tag_columns.clone(),
344                field_column: self.field_column.clone(),
345                input,
346                output_schema: self.output_schema.clone(),
347                unfix: None,
348            })
349        }
350    }
351}
352
353#[derive(Debug, Clone)]
354struct ScalarCalculateExec {
355    start: Millisecond,
356    end: Millisecond,
357    interval: Millisecond,
358    schema: SchemaRef,
359    project_index: (usize, usize),
360    input: Arc<dyn ExecutionPlan>,
361    tag_columns: Vec<String>,
362    metric: ExecutionPlanMetricsSet,
363    properties: PlanProperties,
364}
365
366impl ExecutionPlan for ScalarCalculateExec {
367    fn as_any(&self) -> &dyn Any {
368        self
369    }
370
371    fn schema(&self) -> SchemaRef {
372        self.schema.clone()
373    }
374
375    fn properties(&self) -> &PlanProperties {
376        &self.properties
377    }
378
379    fn maintains_input_order(&self) -> Vec<bool> {
380        vec![true; self.children().len()]
381    }
382
383    fn required_input_distribution(&self) -> Vec<Distribution> {
384        vec![Distribution::SinglePartition]
385    }
386
387    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
388        vec![&self.input]
389    }
390
391    fn with_new_children(
392        self: Arc<Self>,
393        children: Vec<Arc<dyn ExecutionPlan>>,
394    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
395        Ok(Arc::new(ScalarCalculateExec {
396            start: self.start,
397            end: self.end,
398            interval: self.interval,
399            schema: self.schema.clone(),
400            project_index: self.project_index,
401            tag_columns: self.tag_columns.clone(),
402            input: children[0].clone(),
403            metric: self.metric.clone(),
404            properties: self.properties.clone(),
405        }))
406    }
407
408    fn execute(
409        &self,
410        partition: usize,
411        context: Arc<TaskContext>,
412    ) -> DataFusionResult<SendableRecordBatchStream> {
413        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
414        let input = self.input.execute(partition, context)?;
415        let schema = input.schema();
416        let tag_indices = self
417            .tag_columns
418            .iter()
419            .map(|tag| {
420                schema
421                    .column_with_name(tag)
422                    .unwrap_or_else(|| panic!("tag column not found {tag}"))
423                    .0
424            })
425            .collect();
426
427        Ok(Box::pin(ScalarCalculateStream {
428            start: self.start,
429            end: self.end,
430            interval: self.interval,
431            schema: self.schema.clone(),
432            project_index: self.project_index,
433            metric: baseline_metric,
434            tag_indices,
435            input,
436            have_multi_series: false,
437            done: false,
438            batch: None,
439            tag_value: None,
440        }))
441    }
442
443    fn metrics(&self) -> Option<MetricsSet> {
444        Some(self.metric.clone_inner())
445    }
446
447    fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
448        let input_stats = self.input.partition_statistics(partition)?;
449
450        let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
451        let estimated_total_bytes = input_stats
452            .total_byte_size
453            .get_value()
454            .zip(input_stats.num_rows.get_value())
455            .map(|(size, rows)| {
456                Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
457            })
458            .unwrap_or_default();
459
460        Ok(Statistics {
461            num_rows: Precision::Inexact(estimated_row_num as _),
462            total_byte_size: estimated_total_bytes,
463            // TODO(ruihang): support this column statistics
464            column_statistics: Statistics::unknown_column(&self.schema()),
465        })
466    }
467
468    fn name(&self) -> &str {
469        "ScalarCalculateExec"
470    }
471}
472
473impl DisplayAs for ScalarCalculateExec {
474    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
475        match t {
476            DisplayFormatType::Default
477            | DisplayFormatType::Verbose
478            | DisplayFormatType::TreeRender => {
479                write!(f, "ScalarCalculateExec: tags={:?}", self.tag_columns)
480            }
481        }
482    }
483}
484
485struct ScalarCalculateStream {
486    start: Millisecond,
487    end: Millisecond,
488    interval: Millisecond,
489    schema: SchemaRef,
490    input: SendableRecordBatchStream,
491    metric: BaselineMetrics,
492    tag_indices: Vec<usize>,
493    /// with format `(ts_index, field_index)`
494    project_index: (usize, usize),
495    have_multi_series: bool,
496    done: bool,
497    batch: Option<RecordBatch>,
498    tag_value: Option<Vec<String>>,
499}
500
501impl RecordBatchStream for ScalarCalculateStream {
502    fn schema(&self) -> SchemaRef {
503        self.schema.clone()
504    }
505}
506
507impl ScalarCalculateStream {
508    fn update_batch(&mut self, batch: RecordBatch) -> DataFusionResult<()> {
509        let _timer = self.metric.elapsed_compute();
510        // if have multi time series or empty batch, scalar will return NaN
511        if self.have_multi_series || batch.num_rows() == 0 {
512            return Ok(());
513        }
514        // fast path: no tag columns means all data belongs to the same series.
515        if self.tag_indices.is_empty() {
516            self.append_batch(batch)?;
517            return Ok(());
518        }
519        let all_same = |val: Option<&str>, array: &StringArray| -> bool {
520            if let Some(v) = val {
521                array.iter().all(|s| s == Some(v))
522            } else {
523                array.is_empty() || array.iter().skip(1).all(|s| s == Some(array.value(0)))
524            }
525        };
526        // assert the entire batch belong to the same series
527        let all_tag_columns_same = if let Some(tags) = &self.tag_value {
528            tags.iter()
529                .zip(self.tag_indices.iter())
530                .all(|(value, index)| {
531                    let array = batch.column(*index);
532                    let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
533                    all_same(Some(value), string_array)
534                })
535        } else {
536            let mut tag_values = Vec::with_capacity(self.tag_indices.len());
537            let is_same = self.tag_indices.iter().all(|index| {
538                let array = batch.column(*index);
539                let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
540                tag_values.push(string_array.value(0).to_string());
541                all_same(None, string_array)
542            });
543            self.tag_value = Some(tag_values);
544            is_same
545        };
546        if all_tag_columns_same {
547            self.append_batch(batch)?;
548        } else {
549            self.have_multi_series = true;
550        }
551        Ok(())
552    }
553
554    fn append_batch(&mut self, input_batch: RecordBatch) -> DataFusionResult<()> {
555        let ts_column = input_batch.column(self.project_index.0).clone();
556        let val_column = cast_with_options(
557            input_batch.column(self.project_index.1),
558            &DataType::Float64,
559            &CastOptions::default(),
560        )?;
561        let input_batch = RecordBatch::try_new(self.schema.clone(), vec![ts_column, val_column])?;
562        if let Some(batch) = &self.batch {
563            self.batch = Some(concat_batches(&self.schema, vec![batch, &input_batch])?);
564        } else {
565            self.batch = Some(input_batch);
566        }
567        Ok(())
568    }
569}
570
571impl Stream for ScalarCalculateStream {
572    type Item = DataFusionResult<RecordBatch>;
573
574    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
575        loop {
576            if self.done {
577                return Poll::Ready(None);
578            }
579            match ready!(self.input.poll_next_unpin(cx)) {
580                Some(Ok(batch)) => {
581                    self.update_batch(batch)?;
582                }
583                // inner had error, return to caller
584                Some(Err(e)) => return Poll::Ready(Some(Err(e))),
585                // inner is done, producing output
586                None => {
587                    self.done = true;
588                    return match self.batch.take() {
589                        Some(batch) if !self.have_multi_series => {
590                            self.metric.record_output(batch.num_rows());
591                            Poll::Ready(Some(Ok(batch)))
592                        }
593                        _ => {
594                            let time_array = (self.start..=self.end)
595                                .step_by(self.interval as _)
596                                .collect::<Vec<_>>();
597                            let nums = time_array.len();
598                            let nan_batch = RecordBatch::try_new(
599                                self.schema.clone(),
600                                vec![
601                                    Arc::new(TimestampMillisecondArray::from(time_array)),
602                                    Arc::new(Float64Array::from(vec![f64::NAN; nums])),
603                                ],
604                            )?;
605                            self.metric.record_output(nan_batch.num_rows());
606                            Poll::Ready(Some(Ok(nan_batch)))
607                        }
608                    };
609                }
610            };
611        }
612    }
613}
614
615#[cfg(test)]
616mod test {
617    use datafusion::arrow::datatypes::{DataType, Field, Schema};
618    use datafusion::datasource::memory::MemorySourceConfig;
619    use datafusion::datasource::source::DataSourceExec;
620    use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
621    use datafusion::prelude::SessionContext;
622    use datatypes::arrow::array::{Float64Array, TimestampMillisecondArray};
623    use datatypes::arrow::datatypes::TimeUnit;
624
625    use super::*;
626
627    fn prepare_test_data(series: Vec<RecordBatch>) -> DataSourceExec {
628        let schema = Arc::new(Schema::new(vec![
629            Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
630            Field::new("tag1", DataType::Utf8, true),
631            Field::new("tag2", DataType::Utf8, true),
632            Field::new("val", DataType::Float64, true),
633        ]));
634        DataSourceExec::new(Arc::new(
635            MemorySourceConfig::try_new(&[series], schema, None).unwrap(),
636        ))
637    }
638
639    async fn run_test(series: Vec<RecordBatch>, expected: &str) {
640        let memory_exec = Arc::new(prepare_test_data(series));
641        let schema = Arc::new(Schema::new(vec![
642            Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
643            Field::new("val", DataType::Float64, true),
644        ]));
645        let properties = PlanProperties::new(
646            EquivalenceProperties::new(schema.clone()),
647            Partitioning::UnknownPartitioning(1),
648            EmissionType::Incremental,
649            Boundedness::Bounded,
650        );
651        let scalar_exec = Arc::new(ScalarCalculateExec {
652            start: 0,
653            end: 15_000,
654            interval: 5000,
655            tag_columns: vec!["tag1".to_string(), "tag2".to_string()],
656            input: memory_exec,
657            schema,
658            project_index: (0, 3),
659            metric: ExecutionPlanMetricsSet::new(),
660            properties,
661        });
662        let session_context = SessionContext::default();
663        let result = datafusion::physical_plan::collect(scalar_exec, session_context.task_ctx())
664            .await
665            .unwrap();
666        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
667            .unwrap()
668            .to_string();
669        assert_eq!(result_literal, expected);
670    }
671
672    #[tokio::test]
673    async fn same_series() {
674        let schema = Arc::new(Schema::new(vec![
675            Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
676            Field::new("tag1", DataType::Utf8, true),
677            Field::new("tag2", DataType::Utf8, true),
678            Field::new("val", DataType::Float64, true),
679        ]));
680        run_test(
681            vec![
682                RecordBatch::try_new(
683                    schema.clone(),
684                    vec![
685                        Arc::new(TimestampMillisecondArray::from(vec![0, 5_000])),
686                        Arc::new(StringArray::from(vec!["foo", "foo"])),
687                        Arc::new(StringArray::from(vec!["🥺", "🥺"])),
688                        Arc::new(Float64Array::from(vec![1.0, 2.0])),
689                    ],
690                )
691                .unwrap(),
692                RecordBatch::try_new(
693                    schema,
694                    vec![
695                        Arc::new(TimestampMillisecondArray::from(vec![10_000, 15_000])),
696                        Arc::new(StringArray::from(vec!["foo", "foo"])),
697                        Arc::new(StringArray::from(vec!["🥺", "🥺"])),
698                        Arc::new(Float64Array::from(vec![3.0, 4.0])),
699                    ],
700                )
701                .unwrap(),
702            ],
703            "+---------------------+-----+\
704            \n| ts                  | val |\
705            \n+---------------------+-----+\
706            \n| 1970-01-01T00:00:00 | 1.0 |\
707            \n| 1970-01-01T00:00:05 | 2.0 |\
708            \n| 1970-01-01T00:00:10 | 3.0 |\
709            \n| 1970-01-01T00:00:15 | 4.0 |\
710            \n+---------------------+-----+",
711        )
712        .await
713    }
714
715    #[tokio::test]
716    async fn diff_series() {
717        let schema = Arc::new(Schema::new(vec![
718            Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
719            Field::new("tag1", DataType::Utf8, true),
720            Field::new("tag2", DataType::Utf8, true),
721            Field::new("val", DataType::Float64, true),
722        ]));
723        run_test(
724            vec![
725                RecordBatch::try_new(
726                    schema.clone(),
727                    vec![
728                        Arc::new(TimestampMillisecondArray::from(vec![0, 5_000])),
729                        Arc::new(StringArray::from(vec!["foo", "foo"])),
730                        Arc::new(StringArray::from(vec!["🥺", "🥺"])),
731                        Arc::new(Float64Array::from(vec![1.0, 2.0])),
732                    ],
733                )
734                .unwrap(),
735                RecordBatch::try_new(
736                    schema,
737                    vec![
738                        Arc::new(TimestampMillisecondArray::from(vec![10_000, 15_000])),
739                        Arc::new(StringArray::from(vec!["foo", "foo"])),
740                        Arc::new(StringArray::from(vec!["🥺", "😝"])),
741                        Arc::new(Float64Array::from(vec![3.0, 4.0])),
742                    ],
743                )
744                .unwrap(),
745            ],
746            "+---------------------+-----+\
747            \n| ts                  | val |\
748            \n+---------------------+-----+\
749            \n| 1970-01-01T00:00:00 | NaN |\
750            \n| 1970-01-01T00:00:05 | NaN |\
751            \n| 1970-01-01T00:00:10 | NaN |\
752            \n| 1970-01-01T00:00:15 | NaN |\
753            \n+---------------------+-----+",
754        )
755        .await
756    }
757
758    #[tokio::test]
759    async fn empty_series() {
760        let schema = Arc::new(Schema::new(vec![
761            Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
762            Field::new("tag1", DataType::Utf8, true),
763            Field::new("tag2", DataType::Utf8, true),
764            Field::new("val", DataType::Float64, true),
765        ]));
766        run_test(
767            vec![
768                RecordBatch::try_new(
769                    schema,
770                    vec![
771                        Arc::new(TimestampMillisecondArray::new_null(0)),
772                        Arc::new(StringArray::new_null(0)),
773                        Arc::new(StringArray::new_null(0)),
774                        Arc::new(Float64Array::new_null(0)),
775                    ],
776                )
777                .unwrap(),
778            ],
779            "+---------------------+-----+\
780            \n| ts                  | val |\
781            \n+---------------------+-----+\
782            \n| 1970-01-01T00:00:00 | NaN |\
783            \n| 1970-01-01T00:00:05 | NaN |\
784            \n| 1970-01-01T00:00:10 | NaN |\
785            \n| 1970-01-01T00:00:15 | NaN |\
786            \n+---------------------+-----+",
787        )
788        .await
789    }
790}