query/range_select/
plan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::btree_map::Entry;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Display;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use ahash::RandomState;
25use arrow::compute::{self, cast_with_options, take_arrays, CastOptions};
26use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions, TimeUnit};
27use common_recordbatch::DfSendableRecordBatchStream;
28use datafusion::common::{Result as DataFusionResult, Statistics};
29use datafusion::error::Result as DfResult;
30use datafusion::execution::context::SessionState;
31use datafusion::execution::TaskContext;
32use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
33use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
34use datafusion::physical_plan::{
35    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream,
36    SendableRecordBatchStream,
37};
38use datafusion::physical_planner::create_physical_sort_expr;
39use datafusion_common::hash_utils::create_hashes;
40use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, ScalarValue};
41use datafusion_expr::utils::{exprlist_to_fields, COUNT_STAR_EXPANSION};
42use datafusion_expr::{
43    lit, Accumulator, Expr, ExprSchemable, LogicalPlan, UserDefinedLogicalNodeCore,
44};
45use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
46use datafusion_physical_expr::{
47    create_physical_expr, Distribution, EquivalenceProperties, LexOrdering, Partitioning,
48    PhysicalExpr, PhysicalSortExpr,
49};
50use datatypes::arrow::array::{
51    Array, ArrayRef, TimestampMillisecondArray, TimestampMillisecondBuilder, UInt32Builder,
52};
53use datatypes::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType};
54use datatypes::arrow::record_batch::RecordBatch;
55use datatypes::arrow::row::{OwnedRow, RowConverter, SortField};
56use futures::{ready, Stream};
57use futures_util::StreamExt;
58use snafu::ensure;
59
60use crate::error::{RangeQuerySnafu, Result};
61
62type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native;
63
64#[derive(PartialEq, Eq, Debug, Hash, Clone)]
65pub enum Fill {
66    Null,
67    Prev,
68    Linear,
69    Const(ScalarValue),
70}
71
72impl Display for Fill {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        match self {
75            Fill::Null => write!(f, "NULL"),
76            Fill::Prev => write!(f, "PREV"),
77            Fill::Linear => write!(f, "LINEAR"),
78            Fill::Const(x) => write!(f, "{}", x),
79        }
80    }
81}
82
83impl Fill {
84    pub fn try_from_str(value: &str, datatype: &DataType) -> DfResult<Option<Self>> {
85        let s = value.to_uppercase();
86        match s.as_str() {
87            "" => Ok(None),
88            "NULL" => Ok(Some(Self::Null)),
89            "PREV" => Ok(Some(Self::Prev)),
90            "LINEAR" => {
91                if datatype.is_numeric() {
92                    Ok(Some(Self::Linear))
93                } else {
94                    Err(DataFusionError::Plan(format!(
95                        "Use FILL LINEAR on Non-numeric DataType {}",
96                        datatype
97                    )))
98                }
99            }
100            _ => ScalarValue::try_from_string(s.clone(), datatype)
101                .map_err(|err| {
102                    DataFusionError::Plan(format!(
103                        "{} is not a valid fill option, fail to convert to a const value. {{ {} }}",
104                        s, err
105                    ))
106                })
107                .map(|x| Some(Fill::Const(x))),
108        }
109    }
110
111    /// The input `data` contains data on a complete time series.
112    /// If the filling strategy is `PREV` or `LINEAR`, caller must be ensured that the incoming `ts`&`data` is ascending time order.
113    pub fn apply_fill_strategy(&self, ts: &[i64], data: &mut [ScalarValue]) -> DfResult<()> {
114        // No calculation need in `Fill::Null`
115        if matches!(self, Fill::Null) {
116            return Ok(());
117        }
118        let len = data.len();
119        if *self == Fill::Linear {
120            return Self::fill_linear(ts, data);
121        }
122        for i in 0..len {
123            if data[i].is_null() {
124                match self {
125                    Fill::Prev => {
126                        if i != 0 {
127                            data[i] = data[i - 1].clone()
128                        }
129                    }
130                    // The calculation of linear interpolation is relatively complicated.
131                    // `Self::fill_linear` is used to dispose `Fill::Linear`.
132                    // No calculation need in `Fill::Null`
133                    Fill::Linear | Fill::Null => unreachable!(),
134                    Fill::Const(v) => data[i] = v.clone(),
135                }
136            }
137        }
138        Ok(())
139    }
140
141    fn fill_linear(ts: &[i64], data: &mut [ScalarValue]) -> DfResult<()> {
142        let not_null_num = data
143            .iter()
144            .fold(0, |acc, x| if x.is_null() { acc } else { acc + 1 });
145        // We need at least two non-empty data points to perform linear interpolation
146        if not_null_num < 2 {
147            return Ok(());
148        }
149        let mut index = 0;
150        let mut head: Option<usize> = None;
151        let mut tail: Option<usize> = None;
152        while index < data.len() {
153            // find null interval [start, end)
154            // start is null, end is not-null
155            let start = data[index..]
156                .iter()
157                .position(ScalarValue::is_null)
158                .unwrap_or(data.len() - index)
159                + index;
160            if start == data.len() {
161                break;
162            }
163            let end = data[start..]
164                .iter()
165                .position(|r| !r.is_null())
166                .unwrap_or(data.len() - start)
167                + start;
168            index = end + 1;
169            // head or tail null dispose later, record start/end first
170            if start == 0 {
171                head = Some(end);
172            } else if end == data.len() {
173                tail = Some(start);
174            } else {
175                linear_interpolation(ts, data, start - 1, end, start, end)?;
176            }
177        }
178        // dispose head null interval
179        if let Some(end) = head {
180            linear_interpolation(ts, data, end, end + 1, 0, end)?;
181        }
182        // dispose tail null interval
183        if let Some(start) = tail {
184            linear_interpolation(ts, data, start - 2, start - 1, start, data.len())?;
185        }
186        Ok(())
187    }
188}
189
190/// use `(ts[i1], data[i1])`, `(ts[i2], data[i2])` as endpoint, linearly interpolates element over the interval `[start, end)`
191fn linear_interpolation(
192    ts: &[i64],
193    data: &mut [ScalarValue],
194    i1: usize,
195    i2: usize,
196    start: usize,
197    end: usize,
198) -> DfResult<()> {
199    let (x0, x1) = (ts[i1] as f64, ts[i2] as f64);
200    let (y0, y1, is_float32) = match (&data[i1], &data[i2]) {
201        (ScalarValue::Float64(Some(y0)), ScalarValue::Float64(Some(y1))) => (*y0, *y1, false),
202        (ScalarValue::Float32(Some(y0)), ScalarValue::Float32(Some(y1))) => {
203            (*y0 as f64, *y1 as f64, true)
204        }
205        _ => {
206            return Err(DataFusionError::Execution(
207                "RangePlan: Apply Fill LINEAR strategy on Non-floating type".to_string(),
208            ));
209        }
210    };
211    // To avoid divide zero error, kind of defensive programming
212    if x1 == x0 {
213        return Err(DataFusionError::Execution(
214            "RangePlan: Linear interpolation using the same coordinate points".to_string(),
215        ));
216    }
217    for i in start..end {
218        let val = y0 + (y1 - y0) / (x1 - x0) * (ts[i] as f64 - x0);
219        data[i] = if is_float32 {
220            ScalarValue::Float32(Some(val as f32))
221        } else {
222            ScalarValue::Float64(Some(val))
223        }
224    }
225    Ok(())
226}
227
228#[derive(Eq, Clone, Debug)]
229pub struct RangeFn {
230    /// with format like `max(a) RANGE 300s [FILL NULL]`
231    pub name: String,
232    pub data_type: DataType,
233    pub expr: Expr,
234    pub range: Duration,
235    pub fill: Option<Fill>,
236    /// If the `FIll` strategy is `Linear` and the output is an integer,
237    /// it is possible to calculate a floating point number.
238    /// So for `FILL==LINEAR`, the entire data will be implicitly converted to Float type
239    /// If `need_cast==true`, `data_type` may not consist with type `expr` generated.
240    pub need_cast: bool,
241}
242
243impl PartialEq for RangeFn {
244    fn eq(&self, other: &Self) -> bool {
245        self.name == other.name
246    }
247}
248
249impl PartialOrd for RangeFn {
250    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
251        Some(self.cmp(other))
252    }
253}
254
255impl Ord for RangeFn {
256    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
257        self.name.cmp(&other.name)
258    }
259}
260
261impl std::hash::Hash for RangeFn {
262    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
263        self.name.hash(state);
264    }
265}
266
267impl Display for RangeFn {
268    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269        write!(f, "{}", self.name)
270    }
271}
272
273#[derive(Debug, PartialEq, Eq, Hash)]
274pub struct RangeSelect {
275    /// The incoming logical plan
276    pub input: Arc<LogicalPlan>,
277    /// all range expressions
278    pub range_expr: Vec<RangeFn>,
279    pub align: Duration,
280    pub align_to: i64,
281    pub time_index: String,
282    pub time_expr: Expr,
283    pub by: Vec<Expr>,
284    pub schema: DFSchemaRef,
285    pub by_schema: DFSchemaRef,
286    /// If the `schema` of the `RangeSelect` happens to be the same as the content of the upper-level Projection Plan,
287    /// the final output needs to be `project` through `schema_project`,
288    /// so that we can omit the upper-level Projection Plan.
289    pub schema_project: Option<Vec<usize>>,
290    /// The schema before run projection, follow the order of `range expr | time index | by columns`
291    /// `schema_before_project  ----  schema_project ----> schema`
292    /// if `schema_project==None` then `schema_before_project==schema`
293    pub schema_before_project: DFSchemaRef,
294}
295
296impl PartialOrd for RangeSelect {
297    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
298        // Compare fields in order excluding `schema`, `by_schema`, `schema_before_project`.
299        match self.input.partial_cmp(&other.input) {
300            Some(Ordering::Equal) => {}
301            ord => return ord,
302        }
303        match self.range_expr.partial_cmp(&other.range_expr) {
304            Some(Ordering::Equal) => {}
305            ord => return ord,
306        }
307        match self.align.partial_cmp(&other.align) {
308            Some(Ordering::Equal) => {}
309            ord => return ord,
310        }
311        match self.align_to.partial_cmp(&other.align_to) {
312            Some(Ordering::Equal) => {}
313            ord => return ord,
314        }
315        match self.time_index.partial_cmp(&other.time_index) {
316            Some(Ordering::Equal) => {}
317            ord => return ord,
318        }
319        match self.time_expr.partial_cmp(&other.time_expr) {
320            Some(Ordering::Equal) => {}
321            ord => return ord,
322        }
323        match self.by.partial_cmp(&other.by) {
324            Some(Ordering::Equal) => {}
325            ord => return ord,
326        }
327        self.schema_project.partial_cmp(&other.schema_project)
328    }
329}
330
331impl RangeSelect {
332    pub fn try_new(
333        input: Arc<LogicalPlan>,
334        range_expr: Vec<RangeFn>,
335        align: Duration,
336        align_to: i64,
337        time_index: Expr,
338        by: Vec<Expr>,
339        projection_expr: &[Expr],
340    ) -> Result<Self> {
341        ensure!(
342            align.as_millis() != 0,
343            RangeQuerySnafu {
344                msg: "Can't use 0 as align in Range Query"
345            }
346        );
347        for expr in &range_expr {
348            ensure!(
349                expr.range.as_millis() != 0,
350                RangeQuerySnafu {
351                    msg: format!(
352                        "Invalid Range expr `{}`, Can't use 0 as range in Range Query",
353                        expr.name
354                    )
355                }
356            );
357        }
358        let mut fields = range_expr
359            .iter()
360            .map(
361                |RangeFn {
362                     name,
363                     data_type,
364                     fill,
365                     ..
366                 }| {
367                    let field = Field::new(
368                        name,
369                        data_type.clone(),
370                        // Only when data fill with Const option, the data can't be null
371                        !matches!(fill, Some(Fill::Const(..))),
372                    );
373                    Ok((None, Arc::new(field)))
374                },
375            )
376            .collect::<DfResult<Vec<_>>>()?;
377        // add align_ts
378        let ts_field = time_index.to_field(input.schema().as_ref())?;
379        let time_index_name = ts_field.1.name().clone();
380        fields.push(ts_field);
381        // add by
382        let by_fields = exprlist_to_fields(&by, &input)?;
383        fields.extend(by_fields.clone());
384        let schema_before_project = Arc::new(DFSchema::new_with_metadata(
385            fields,
386            input.schema().metadata().clone(),
387        )?);
388        let by_schema = Arc::new(DFSchema::new_with_metadata(
389            by_fields,
390            input.schema().metadata().clone(),
391        )?);
392        // If the results of project plan can be obtained directly from range plan without any additional
393        // calculations, no project plan is required. We can simply project the final output of the range
394        // plan to produce the final result.
395        let schema_project = projection_expr
396            .iter()
397            .map(|project_expr| {
398                if let Expr::Column(column) = project_expr {
399                    schema_before_project
400                        .index_of_column_by_name(column.relation.as_ref(), &column.name)
401                        .ok_or(())
402                } else {
403                    let (qualifier, field) = project_expr
404                        .to_field(input.schema().as_ref())
405                        .map_err(|_| ())?;
406                    schema_before_project
407                        .index_of_column_by_name(qualifier.as_ref(), field.name())
408                        .ok_or(())
409                }
410            })
411            .collect::<std::result::Result<Vec<usize>, ()>>()
412            .ok();
413        let schema = if let Some(project) = &schema_project {
414            let project_field = project
415                .iter()
416                .map(|i| {
417                    let f = schema_before_project.qualified_field(*i);
418                    (f.0.cloned(), Arc::new(f.1.clone()))
419                })
420                .collect();
421            Arc::new(DFSchema::new_with_metadata(
422                project_field,
423                input.schema().metadata().clone(),
424            )?)
425        } else {
426            schema_before_project.clone()
427        };
428        Ok(Self {
429            input,
430            range_expr,
431            align,
432            align_to,
433            time_index: time_index_name,
434            time_expr: time_index,
435            schema,
436            by_schema,
437            by,
438            schema_project,
439            schema_before_project,
440        })
441    }
442}
443
444impl UserDefinedLogicalNodeCore for RangeSelect {
445    fn name(&self) -> &str {
446        "RangeSelect"
447    }
448
449    fn inputs(&self) -> Vec<&LogicalPlan> {
450        vec![&self.input]
451    }
452
453    fn schema(&self) -> &DFSchemaRef {
454        &self.schema
455    }
456
457    fn expressions(&self) -> Vec<Expr> {
458        self.range_expr
459            .iter()
460            .map(|expr| expr.expr.clone())
461            .chain([self.time_expr.clone()])
462            .chain(self.by.clone())
463            .collect()
464    }
465
466    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
467        write!(
468            f,
469            "RangeSelect: range_exprs=[{}], align={}ms, align_to={}ms, align_by=[{}], time_index={}",
470            self.range_expr
471                .iter()
472                .map(ToString::to_string)
473                .collect::<Vec<_>>()
474                .join(", "),
475            self.align.as_millis(),
476            self.align_to,
477            self.by
478                .iter()
479                .map(ToString::to_string)
480                .collect::<Vec<_>>()
481                .join(", "),
482            self.time_index
483        )
484    }
485
486    fn with_exprs_and_inputs(
487        &self,
488        exprs: Vec<Expr>,
489        inputs: Vec<LogicalPlan>,
490    ) -> DataFusionResult<Self> {
491        if inputs.is_empty() {
492            return Err(DataFusionError::Plan(
493                "RangeSelect: inputs is empty".to_string(),
494            ));
495        }
496        if exprs.len() != self.range_expr.len() + self.by.len() + 1 {
497            return Err(DataFusionError::Plan(
498                "RangeSelect: exprs length not match".to_string(),
499            ));
500        }
501
502        let range_expr = exprs
503            .iter()
504            .zip(self.range_expr.iter())
505            .map(|(e, range)| RangeFn {
506                name: range.name.clone(),
507                data_type: range.data_type.clone(),
508                expr: e.clone(),
509                range: range.range,
510                fill: range.fill.clone(),
511                need_cast: range.need_cast,
512            })
513            .collect();
514        let time_expr = exprs[self.range_expr.len()].clone();
515        let by = exprs[self.range_expr.len() + 1..].to_vec();
516        Ok(Self {
517            align: self.align,
518            align_to: self.align_to,
519            range_expr,
520            input: Arc::new(inputs[0].clone()),
521            time_index: self.time_index.clone(),
522            time_expr,
523            schema: self.schema.clone(),
524            by,
525            by_schema: self.by_schema.clone(),
526            schema_project: self.schema_project.clone(),
527            schema_before_project: self.schema_before_project.clone(),
528        })
529    }
530}
531
532impl RangeSelect {
533    fn create_physical_expr_list(
534        &self,
535        is_count_aggr: bool,
536        exprs: &[Expr],
537        df_schema: &Arc<DFSchema>,
538        session_state: &SessionState,
539    ) -> DfResult<Vec<Arc<dyn PhysicalExpr>>> {
540        exprs
541            .iter()
542            .map(|e| match e {
543                // `count(*)` will be rewritten by `CountWildcardRule` into `count(1)` when optimizing logical plan.
544                // The modification occurs after range plan rewrite.
545                // At this time, aggregate plan has been replaced by a custom range plan,
546                // so `CountWildcardRule` has not been applied.
547                // We manually modify it when creating the physical plan.
548                Expr::Wildcard { .. } if is_count_aggr => create_physical_expr(
549                    &lit(COUNT_STAR_EXPANSION),
550                    df_schema.as_ref(),
551                    session_state.execution_props(),
552                ),
553                _ => create_physical_expr(e, df_schema.as_ref(), session_state.execution_props()),
554            })
555            .collect::<DfResult<Vec<_>>>()
556    }
557
558    pub fn to_execution_plan(
559        &self,
560        logical_input: &LogicalPlan,
561        exec_input: Arc<dyn ExecutionPlan>,
562        session_state: &SessionState,
563    ) -> DfResult<Arc<dyn ExecutionPlan>> {
564        let fields: Vec<_> = self
565            .schema_before_project
566            .fields()
567            .iter()
568            .map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable()))
569            .collect();
570        let by_fields: Vec<_> = self
571            .by_schema
572            .fields()
573            .iter()
574            .map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable()))
575            .collect();
576        let input_dfschema = logical_input.schema();
577        let input_schema = exec_input.schema();
578        let range_exec: Vec<RangeFnExec> = self
579            .range_expr
580            .iter()
581            .map(|range_fn| {
582                let name = range_fn.expr.schema_name().to_string();
583                let range_expr = match &range_fn.expr {
584                    Expr::Alias(expr) => expr.expr.as_ref(),
585                    others => others,
586                };
587
588                let expr = match &range_expr {
589                    Expr::AggregateFunction(aggr)
590                        if (aggr.func.name() == "last_value"
591                            || aggr.func.name() == "first_value") =>
592                    {
593                        let order_by = if let Some(exprs) = &aggr.order_by {
594                            exprs
595                                .iter()
596                                .map(|x| {
597                                    create_physical_sort_expr(
598                                        x,
599                                        input_dfschema.as_ref(),
600                                        session_state.execution_props(),
601                                    )
602                                })
603                                .collect::<DfResult<Vec<_>>>()?
604                        } else {
605                            // if user not assign order by, time index is needed as default ordering
606                            let time_index = create_physical_expr(
607                                &self.time_expr,
608                                input_dfschema.as_ref(),
609                                session_state.execution_props(),
610                            )?;
611                            vec![PhysicalSortExpr {
612                                expr: time_index,
613                                options: SortOptions {
614                                    descending: false,
615                                    nulls_first: false,
616                                },
617                            }]
618                        };
619                        let arg = self.create_physical_expr_list(
620                            false,
621                            &aggr.args,
622                            input_dfschema,
623                            session_state,
624                        )?;
625                        // first_value/last_value has only one param.
626                        // The param have been checked by datafusion in logical plan stage.
627                        // We can safely assume that there is only one element here.
628                        AggregateExprBuilder::new(aggr.func.clone(), arg)
629                            .schema(input_schema.clone())
630                            .order_by(LexOrdering::new(order_by))
631                            .alias(name)
632                            .build()
633                    }
634                    Expr::AggregateFunction(aggr) => {
635                        let order_by = if let Some(exprs) = &aggr.order_by {
636                            exprs
637                                .iter()
638                                .map(|x| {
639                                    create_physical_sort_expr(
640                                        x,
641                                        input_dfschema.as_ref(),
642                                        session_state.execution_props(),
643                                    )
644                                })
645                                .collect::<DfResult<Vec<_>>>()?
646                        } else {
647                            vec![]
648                        };
649                        let distinct = aggr.distinct;
650                        // TODO(discord9): add default null treatment?
651
652                        let input_phy_exprs = self.create_physical_expr_list(
653                            aggr.func.name() == "count",
654                            &aggr.args,
655                            input_dfschema,
656                            session_state,
657                        )?;
658                        AggregateExprBuilder::new(aggr.func.clone(), input_phy_exprs)
659                            .schema(input_schema.clone())
660                            .order_by(LexOrdering::new(order_by))
661                            .with_distinct(distinct)
662                            .alias(name)
663                            .build()
664                    }
665                    _ => Err(DataFusionError::Plan(format!(
666                        "Unexpected Expr: {} in RangeSelect",
667                        range_fn.expr
668                    ))),
669                }?;
670                Ok(RangeFnExec {
671                    expr: Arc::new(expr),
672                    range: range_fn.range.as_millis() as Millisecond,
673                    fill: range_fn.fill.clone(),
674                    need_cast: if range_fn.need_cast {
675                        Some(range_fn.data_type.clone())
676                    } else {
677                        None
678                    },
679                })
680            })
681            .collect::<DfResult<Vec<_>>>()?;
682        let schema_before_project = Arc::new(Schema::new(fields));
683        let schema = if let Some(project) = &self.schema_project {
684            Arc::new(schema_before_project.project(project)?)
685        } else {
686            schema_before_project.clone()
687        };
688        let by = self.create_physical_expr_list(false, &self.by, input_dfschema, session_state)?;
689        let cache = PlanProperties::new(
690            EquivalenceProperties::new(schema.clone()),
691            Partitioning::UnknownPartitioning(1),
692            EmissionType::Incremental,
693            Boundedness::Bounded,
694        );
695        Ok(Arc::new(RangeSelectExec {
696            input: exec_input,
697            range_exec,
698            align: self.align.as_millis() as Millisecond,
699            align_to: self.align_to,
700            by,
701            time_index: self.time_index.clone(),
702            schema,
703            by_schema: Arc::new(Schema::new(by_fields)),
704            metric: ExecutionPlanMetricsSet::new(),
705            schema_before_project,
706            schema_project: self.schema_project.clone(),
707            cache,
708        }))
709    }
710}
711
712/// Range function expression.
713#[derive(Debug, Clone)]
714struct RangeFnExec {
715    expr: Arc<AggregateFunctionExpr>,
716    range: Millisecond,
717    fill: Option<Fill>,
718    need_cast: Option<DataType>,
719}
720
721impl RangeFnExec {
722    /// Returns the expressions to pass to the aggregator.
723    /// It also adds the order by expressions to the list of expressions.
724    /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` requires this.
725    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
726        let mut exprs = self.expr.expressions();
727        if let Some(ordering) = self.expr.order_bys() {
728            exprs.extend(ordering.iter().map(|sort| sort.expr.clone()));
729        }
730        exprs
731    }
732}
733
734impl Display for RangeFnExec {
735    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
736        if let Some(fill) = &self.fill {
737            write!(
738                f,
739                "{} RANGE {}s FILL {}",
740                self.expr.name(),
741                self.range / 1000,
742                fill
743            )
744        } else {
745            write!(f, "{} RANGE {}s", self.expr.name(), self.range / 1000)
746        }
747    }
748}
749
750#[derive(Debug)]
751pub struct RangeSelectExec {
752    input: Arc<dyn ExecutionPlan>,
753    range_exec: Vec<RangeFnExec>,
754    align: Millisecond,
755    align_to: i64,
756    time_index: String,
757    by: Vec<Arc<dyn PhysicalExpr>>,
758    schema: SchemaRef,
759    by_schema: SchemaRef,
760    metric: ExecutionPlanMetricsSet,
761    schema_project: Option<Vec<usize>>,
762    schema_before_project: SchemaRef,
763    cache: PlanProperties,
764}
765
766impl DisplayAs for RangeSelectExec {
767    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
768        match t {
769            DisplayFormatType::Default | DisplayFormatType::Verbose => {
770                write!(f, "RangeSelectExec: ")?;
771                let range_expr_strs: Vec<String> =
772                    self.range_exec.iter().map(RangeFnExec::to_string).collect();
773                let by: Vec<String> = self.by.iter().map(|e| e.to_string()).collect();
774                write!(
775                    f,
776                    "range_expr=[{}], align={}ms, align_to={}ms, align_by=[{}], time_index={}",
777                    range_expr_strs.join(", "),
778                    self.align,
779                    self.align_to,
780                    by.join(", "),
781                    self.time_index,
782                )?;
783            }
784        }
785        Ok(())
786    }
787}
788
789impl ExecutionPlan for RangeSelectExec {
790    fn as_any(&self) -> &dyn std::any::Any {
791        self
792    }
793
794    fn schema(&self) -> SchemaRef {
795        self.schema.clone()
796    }
797
798    fn required_input_distribution(&self) -> Vec<Distribution> {
799        vec![Distribution::SinglePartition]
800    }
801
802    fn properties(&self) -> &PlanProperties {
803        &self.cache
804    }
805
806    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
807        vec![&self.input]
808    }
809
810    fn with_new_children(
811        self: Arc<Self>,
812        children: Vec<Arc<dyn ExecutionPlan>>,
813    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
814        assert!(!children.is_empty());
815        Ok(Arc::new(Self {
816            input: children[0].clone(),
817            range_exec: self.range_exec.clone(),
818            time_index: self.time_index.clone(),
819            by: self.by.clone(),
820            align: self.align,
821            align_to: self.align_to,
822            schema: self.schema.clone(),
823            by_schema: self.by_schema.clone(),
824            metric: self.metric.clone(),
825            schema_before_project: self.schema_before_project.clone(),
826            schema_project: self.schema_project.clone(),
827            cache: self.cache.clone(),
828        }))
829    }
830
831    fn execute(
832        &self,
833        partition: usize,
834        context: Arc<TaskContext>,
835    ) -> DfResult<DfSendableRecordBatchStream> {
836        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
837        let input = self.input.execute(partition, context)?;
838        let schema = input.schema();
839        let time_index = schema
840            .column_with_name(&self.time_index)
841            .ok_or(DataFusionError::Execution(
842                "time index column not found".into(),
843            ))?
844            .0;
845        let row_converter = RowConverter::new(
846            self.by_schema
847                .fields()
848                .iter()
849                .map(|f| SortField::new(f.data_type().clone()))
850                .collect(),
851        )?;
852        Ok(Box::pin(RangeSelectStream {
853            schema: self.schema.clone(),
854            range_exec: self.range_exec.clone(),
855            input,
856            random_state: RandomState::new(),
857            time_index,
858            align: self.align,
859            align_to: self.align_to,
860            by: self.by.clone(),
861            series_map: HashMap::new(),
862            exec_state: ExecutionState::ReadingInput,
863            num_not_null_rows: 0,
864            row_converter,
865            modify_map: HashMap::new(),
866            metric: baseline_metric,
867            schema_project: self.schema_project.clone(),
868            schema_before_project: self.schema_before_project.clone(),
869        }))
870    }
871
872    fn metrics(&self) -> Option<MetricsSet> {
873        Some(self.metric.clone_inner())
874    }
875
876    fn statistics(&self) -> DataFusionResult<Statistics> {
877        Ok(Statistics::new_unknown(self.schema.as_ref()))
878    }
879
880    fn name(&self) -> &str {
881        "RanegSelectExec"
882    }
883}
884
885struct RangeSelectStream {
886    /// the schema of output column
887    schema: SchemaRef,
888    range_exec: Vec<RangeFnExec>,
889    input: SendableRecordBatchStream,
890    /// Column index of TIME INDEX column's position in the input schema
891    time_index: usize,
892    /// the unit of `align` is millisecond
893    align: Millisecond,
894    align_to: i64,
895    by: Vec<Arc<dyn PhysicalExpr>>,
896    exec_state: ExecutionState,
897    /// Converter for the by values
898    row_converter: RowConverter,
899    random_state: RandomState,
900    /// key: time series's hash value
901    /// value: time series's state on different align_ts
902    series_map: HashMap<u64, SeriesState>,
903    /// key: `(hash of by rows, align_ts)`
904    /// value: `[row_ids]`
905    /// It is used to record the data that needs to be aggregated in each time slot during the data update process
906    modify_map: HashMap<(u64, Millisecond), Vec<u32>>,
907    /// The number of rows of not null rows in the final output
908    num_not_null_rows: usize,
909    metric: BaselineMetrics,
910    schema_project: Option<Vec<usize>>,
911    schema_before_project: SchemaRef,
912}
913
914#[derive(Debug)]
915struct SeriesState {
916    /// by values written by `RowWriter`
917    row: OwnedRow,
918    /// key: align_ts
919    /// value: a vector, each element is a range_fn follow the order of `range_exec`
920    align_ts_accumulator: BTreeMap<Millisecond, Vec<Box<dyn Accumulator>>>,
921}
922
923/// Use `align_to` as time origin.
924/// According to `align` as time interval, produces aligned time.
925/// Combining the parameters related to the range query,
926/// determine for each `Accumulator` `(hash, align_ts)` define,
927/// which rows of data will be applied to it.
928fn produce_align_time(
929    align_to: i64,
930    range: Millisecond,
931    align: Millisecond,
932    ts_column: &TimestampMillisecondArray,
933    by_columns_hash: &[u64],
934    modify_map: &mut HashMap<(u64, Millisecond), Vec<u32>>,
935) {
936    modify_map.clear();
937    // make modify_map for range_fn[i]
938    for (row, hash) in by_columns_hash.iter().enumerate() {
939        let ts = ts_column.value(row);
940        let ith_slot = (ts - align_to).div_floor(align);
941        let mut align_ts = ith_slot * align + align_to;
942        while align_ts <= ts && ts < align_ts + range {
943            modify_map
944                .entry((*hash, align_ts))
945                .or_default()
946                .push(row as u32);
947            align_ts -= align;
948        }
949    }
950}
951
952fn cast_scalar_values(values: &mut [ScalarValue], data_type: &DataType) -> DfResult<()> {
953    let array = ScalarValue::iter_to_array(values.to_vec())?;
954    let cast_array = cast_with_options(&array, data_type, &CastOptions::default())?;
955    for (i, value) in values.iter_mut().enumerate() {
956        *value = ScalarValue::try_from_array(&cast_array, i)?;
957    }
958    Ok(())
959}
960
961impl RangeSelectStream {
962    fn evaluate_many(
963        &self,
964        batch: &RecordBatch,
965        exprs: &[Arc<dyn PhysicalExpr>],
966    ) -> DfResult<Vec<ArrayRef>> {
967        exprs
968            .iter()
969            .map(|expr| {
970                let value = expr.evaluate(batch)?;
971                value.into_array(batch.num_rows())
972            })
973            .collect::<DfResult<Vec<_>>>()
974    }
975
976    fn update_range_context(&mut self, batch: RecordBatch) -> DfResult<()> {
977        let _timer = self.metric.elapsed_compute().timer();
978        let num_rows = batch.num_rows();
979        let by_arrays = self.evaluate_many(&batch, &self.by)?;
980        let mut hashes = vec![0; num_rows];
981        create_hashes(&by_arrays, &self.random_state, &mut hashes)?;
982        let by_rows = self.row_converter.convert_columns(&by_arrays)?;
983        let mut ts_column = batch.column(self.time_index).clone();
984        if !matches!(
985            ts_column.data_type(),
986            DataType::Timestamp(TimeUnit::Millisecond, _)
987        ) {
988            ts_column = compute::cast(
989                ts_column.as_ref(),
990                &DataType::Timestamp(TimeUnit::Millisecond, None),
991            )?;
992        }
993        let ts_column_ref = ts_column
994            .as_any()
995            .downcast_ref::<TimestampMillisecondArray>()
996            .ok_or_else(|| {
997                DataFusionError::Execution(
998                    "Time index Column downcast to TimestampMillisecondArray failed".into(),
999                )
1000            })?;
1001        for i in 0..self.range_exec.len() {
1002            let args = self.evaluate_many(&batch, &self.range_exec[i].expressions())?;
1003            // use self.modify_map record (hash, align_ts) => [row_nums]
1004            produce_align_time(
1005                self.align_to,
1006                self.range_exec[i].range,
1007                self.align,
1008                ts_column_ref,
1009                &hashes,
1010                &mut self.modify_map,
1011            );
1012            // build modify_rows/modify_index/offsets for batch update
1013            let mut modify_rows = UInt32Builder::with_capacity(0);
1014            // (hash, align_ts, row_num)
1015            // row_num use to find a by value
1016            // So we just need to record the row_num of a modify row randomly, because they all have the same by value
1017            let mut modify_index = Vec::with_capacity(self.modify_map.len());
1018            let mut offsets = vec![0];
1019            let mut offset_so_far = 0;
1020            for ((hash, ts), modify) in &self.modify_map {
1021                modify_rows.append_slice(modify);
1022                offset_so_far += modify.len();
1023                offsets.push(offset_so_far);
1024                modify_index.push((*hash, *ts, modify[0]));
1025            }
1026            let modify_rows = modify_rows.finish();
1027            let args = take_arrays(&args, &modify_rows, None)?;
1028            modify_index.iter().zip(offsets.windows(2)).try_for_each(
1029                |((hash, ts, row), offset)| {
1030                    let (offset, length) = (offset[0], offset[1] - offset[0]);
1031                    let sliced_arrays: Vec<ArrayRef> = args
1032                        .iter()
1033                        .map(|array| array.slice(offset, length))
1034                        .collect();
1035                    let accumulators_map =
1036                        self.series_map.entry(*hash).or_insert_with(|| SeriesState {
1037                            row: by_rows.row(*row as usize).owned(),
1038                            align_ts_accumulator: BTreeMap::new(),
1039                        });
1040                    match accumulators_map.align_ts_accumulator.entry(*ts) {
1041                        Entry::Occupied(mut e) => {
1042                            let accumulators = e.get_mut();
1043                            accumulators[i].update_batch(&sliced_arrays)
1044                        }
1045                        Entry::Vacant(e) => {
1046                            self.num_not_null_rows += 1;
1047                            let mut accumulators = self
1048                                .range_exec
1049                                .iter()
1050                                .map(|range| range.expr.create_accumulator())
1051                                .collect::<DfResult<Vec<_>>>()?;
1052                            let result = accumulators[i].update_batch(&sliced_arrays);
1053                            e.insert(accumulators);
1054                            result
1055                        }
1056                    }
1057                },
1058            )?;
1059        }
1060        Ok(())
1061    }
1062
1063    fn generate_output(&mut self) -> DfResult<RecordBatch> {
1064        let _timer = self.metric.elapsed_compute().timer();
1065        if self.series_map.is_empty() {
1066            return Ok(RecordBatch::new_empty(self.schema.clone()));
1067        }
1068        // 1 for time index column
1069        let mut columns: Vec<Arc<dyn Array>> =
1070            Vec::with_capacity(1 + self.range_exec.len() + self.by.len());
1071        let mut ts_builder = TimestampMillisecondBuilder::with_capacity(self.num_not_null_rows);
1072        let mut all_scalar =
1073            vec![Vec::with_capacity(self.num_not_null_rows); self.range_exec.len()];
1074        let mut by_rows = Vec::with_capacity(self.num_not_null_rows);
1075        let mut start_index = 0;
1076        // If any range expr need fill, we need fill both the missing align_ts and null value.
1077        let need_fill_output = self.range_exec.iter().any(|range| range.fill.is_some());
1078        // The padding value for each accumulator
1079        let padding_values = self
1080            .range_exec
1081            .iter()
1082            .map(|e| e.expr.create_accumulator()?.evaluate())
1083            .collect::<DfResult<Vec<_>>>()?;
1084        for SeriesState {
1085            row,
1086            align_ts_accumulator,
1087        } in self.series_map.values_mut()
1088        {
1089            // skip empty time series
1090            if align_ts_accumulator.is_empty() {
1091                continue;
1092            }
1093            // find the first and last align_ts
1094            let begin_ts = *align_ts_accumulator.first_key_value().unwrap().0;
1095            let end_ts = *align_ts_accumulator.last_key_value().unwrap().0;
1096            let align_ts = if need_fill_output {
1097                // we need to fill empty align_ts which not data in that solt
1098                (begin_ts..=end_ts).step_by(self.align as usize).collect()
1099            } else {
1100                align_ts_accumulator.keys().copied().collect::<Vec<_>>()
1101            };
1102            for ts in &align_ts {
1103                if let Some(slot) = align_ts_accumulator.get_mut(ts) {
1104                    for (column, acc) in all_scalar.iter_mut().zip(slot.iter_mut()) {
1105                        column.push(acc.evaluate()?);
1106                    }
1107                } else {
1108                    // fill null in empty time solt
1109                    for (column, padding) in all_scalar.iter_mut().zip(padding_values.iter()) {
1110                        column.push(padding.clone())
1111                    }
1112                }
1113            }
1114            ts_builder.append_slice(&align_ts);
1115            // apply fill strategy on time series
1116            for (
1117                i,
1118                RangeFnExec {
1119                    fill, need_cast, ..
1120                },
1121            ) in self.range_exec.iter().enumerate()
1122            {
1123                let time_series_data =
1124                    &mut all_scalar[i][start_index..start_index + align_ts.len()];
1125                if let Some(data_type) = need_cast {
1126                    cast_scalar_values(time_series_data, data_type)?;
1127                }
1128                if let Some(fill) = fill {
1129                    fill.apply_fill_strategy(&align_ts, time_series_data)?;
1130                }
1131            }
1132            by_rows.resize(by_rows.len() + align_ts.len(), row.row());
1133            start_index += align_ts.len();
1134        }
1135        for column_scalar in all_scalar {
1136            columns.push(ScalarValue::iter_to_array(column_scalar)?);
1137        }
1138        let ts_column = ts_builder.finish();
1139        // output schema before project follow the order of range expr | time index | by columns
1140        let ts_column = compute::cast(
1141            &ts_column,
1142            self.schema_before_project.field(columns.len()).data_type(),
1143        )?;
1144        columns.push(ts_column);
1145        columns.extend(self.row_converter.convert_rows(by_rows)?);
1146        let output = RecordBatch::try_new(self.schema_before_project.clone(), columns)?;
1147        let project_output = if let Some(project) = &self.schema_project {
1148            output.project(project)?
1149        } else {
1150            output
1151        };
1152        Ok(project_output)
1153    }
1154}
1155
1156enum ExecutionState {
1157    ReadingInput,
1158    ProducingOutput,
1159    Done,
1160}
1161
1162impl RecordBatchStream for RangeSelectStream {
1163    fn schema(&self) -> SchemaRef {
1164        self.schema.clone()
1165    }
1166}
1167
1168impl Stream for RangeSelectStream {
1169    type Item = DataFusionResult<RecordBatch>;
1170
1171    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1172        loop {
1173            match self.exec_state {
1174                ExecutionState::ReadingInput => {
1175                    match ready!(self.input.poll_next_unpin(cx)) {
1176                        // new batch to aggregate
1177                        Some(Ok(batch)) => {
1178                            if let Err(e) = self.update_range_context(batch) {
1179                                common_telemetry::debug!(
1180                                    "RangeSelectStream cannot update range context, schema: {:?}, err: {:?}", self.schema, e
1181                                );
1182                                return Poll::Ready(Some(Err(e)));
1183                            }
1184                        }
1185                        // inner had error, return to caller
1186                        Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1187                        // inner is done, producing output
1188                        None => {
1189                            self.exec_state = ExecutionState::ProducingOutput;
1190                        }
1191                    }
1192                }
1193                ExecutionState::ProducingOutput => {
1194                    let result = self.generate_output();
1195                    return match result {
1196                        // made output
1197                        Ok(batch) => {
1198                            self.exec_state = ExecutionState::Done;
1199                            Poll::Ready(Some(Ok(batch)))
1200                        }
1201                        // error making output
1202                        Err(error) => Poll::Ready(Some(Err(error))),
1203                    };
1204                }
1205                ExecutionState::Done => return Poll::Ready(None),
1206            }
1207        }
1208    }
1209}
1210
1211#[cfg(test)]
1212mod test {
1213    macro_rules! nullable_array {
1214        ($builder:ident,) => {
1215        };
1216        ($array_type:ident ; $($tail:tt)*) => {
1217            paste::item! {
1218                {
1219                    let mut builder = arrow::array::[<$array_type Builder>]::new();
1220                    nullable_array!(builder, $($tail)*);
1221                    builder.finish()
1222                }
1223            }
1224        };
1225        ($builder:ident, null) => {
1226            $builder.append_null();
1227        };
1228        ($builder:ident, null, $($tail:tt)*) => {
1229            $builder.append_null();
1230            nullable_array!($builder, $($tail)*);
1231        };
1232        ($builder:ident, $value:literal) => {
1233            $builder.append_value($value);
1234        };
1235        ($builder:ident, $value:literal, $($tail:tt)*) => {
1236            $builder.append_value($value);
1237            nullable_array!($builder, $($tail)*);
1238        };
1239    }
1240
1241    use std::sync::Arc;
1242
1243    use arrow_schema::SortOptions;
1244    use datafusion::arrow::datatypes::{
1245        ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
1246    };
1247    use datafusion::functions_aggregate::min_max;
1248    use datafusion::physical_plan::memory::MemoryExec;
1249    use datafusion::physical_plan::sorts::sort::SortExec;
1250    use datafusion::prelude::SessionContext;
1251    use datafusion_physical_expr::expressions::Column;
1252    use datafusion_physical_expr::PhysicalSortExpr;
1253    use datatypes::arrow::array::TimestampMillisecondArray;
1254    use datatypes::arrow_array::StringArray;
1255
1256    use super::*;
1257
1258    const TIME_INDEX_COLUMN: &str = "timestamp";
1259
1260    fn prepare_test_data(is_float: bool, is_gap: bool) -> MemoryExec {
1261        let schema = Arc::new(Schema::new(vec![
1262            Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
1263            Field::new(
1264                "value",
1265                if is_float {
1266                    DataType::Float64
1267                } else {
1268                    DataType::Int64
1269                },
1270                true,
1271            ),
1272            Field::new("host", DataType::Utf8, true),
1273        ]));
1274        let timestamp_column: Arc<dyn Array> = if !is_gap {
1275            Arc::new(TimestampMillisecondArray::from(vec![
1276                0, 5_000, 10_000, 15_000, 20_000, // host 1 every 5s
1277                0, 5_000, 10_000, 15_000, 20_000, // host 2 every 5s
1278            ])) as _
1279        } else {
1280            Arc::new(TimestampMillisecondArray::from(vec![
1281                0, 15_000, // host 1 every 5s, missing data on 5_000, 10_000
1282                0, 15_000, // host 2 every 5s, missing data on 5_000, 10_000
1283            ])) as _
1284        };
1285        let mut host = vec!["host1"; timestamp_column.len() / 2];
1286        host.extend(vec!["host2"; timestamp_column.len() / 2]);
1287        let mut value_column: Arc<dyn Array> = if is_gap {
1288            Arc::new(nullable_array!(Int64;
1289                0, 6, // data for host 1
1290                6, 12 // data for host 2
1291            )) as _
1292        } else {
1293            Arc::new(nullable_array!(Int64;
1294                0, null, 1, null, 2, // data for host 1
1295                3, null, 4, null, 5 // data for host 2
1296            )) as _
1297        };
1298        if is_float {
1299            value_column =
1300                cast_with_options(&value_column, &DataType::Float64, &CastOptions::default())
1301                    .unwrap();
1302        }
1303        let host_column: Arc<dyn Array> = Arc::new(StringArray::from(host)) as _;
1304        let data = RecordBatch::try_new(
1305            schema.clone(),
1306            vec![timestamp_column, value_column, host_column],
1307        )
1308        .unwrap();
1309
1310        MemoryExec::try_new(&[vec![data]], schema, None).unwrap()
1311    }
1312
1313    async fn do_range_select_test(
1314        range1: Millisecond,
1315        range2: Millisecond,
1316        align: Millisecond,
1317        fill: Option<Fill>,
1318        is_float: bool,
1319        is_gap: bool,
1320        expected: String,
1321    ) {
1322        let data_type = if is_float {
1323            DataType::Float64
1324        } else {
1325            DataType::Int64
1326        };
1327        let (need_cast, schema_data_type) = if !is_float && matches!(fill, Some(Fill::Linear)) {
1328            // data_type = DataType::Float64;
1329            (Some(DataType::Float64), DataType::Float64)
1330        } else {
1331            (None, data_type.clone())
1332        };
1333        let memory_exec = Arc::new(prepare_test_data(is_float, is_gap));
1334        let schema = Arc::new(Schema::new(vec![
1335            Field::new("MIN(value)", schema_data_type.clone(), true),
1336            Field::new("MAX(value)", schema_data_type, true),
1337            Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
1338            Field::new("host", DataType::Utf8, true),
1339        ]));
1340        let cache = PlanProperties::new(
1341            EquivalenceProperties::new(schema.clone()),
1342            Partitioning::UnknownPartitioning(1),
1343            EmissionType::Incremental,
1344            Boundedness::Bounded,
1345        );
1346        let input_schema = memory_exec.schema().clone();
1347        let range_select_exec = Arc::new(RangeSelectExec {
1348            input: memory_exec,
1349            range_exec: vec![
1350                RangeFnExec {
1351                    expr: Arc::new(
1352                        AggregateExprBuilder::new(
1353                            min_max::min_udaf(),
1354                            vec![Arc::new(Column::new("value", 1))],
1355                        )
1356                        .schema(input_schema.clone())
1357                        .alias("MIN(value)")
1358                        .build()
1359                        .unwrap(),
1360                    ),
1361                    range: range1,
1362                    fill: fill.clone(),
1363                    need_cast: need_cast.clone(),
1364                },
1365                RangeFnExec {
1366                    expr: Arc::new(
1367                        AggregateExprBuilder::new(
1368                            min_max::max_udaf(),
1369                            vec![Arc::new(Column::new("value", 1))],
1370                        )
1371                        .schema(input_schema.clone())
1372                        .alias("MAX(value)")
1373                        .build()
1374                        .unwrap(),
1375                    ),
1376                    range: range2,
1377                    fill,
1378                    need_cast,
1379                },
1380            ],
1381            align,
1382            align_to: 0,
1383            by: vec![Arc::new(Column::new("host", 2))],
1384            time_index: TIME_INDEX_COLUMN.to_string(),
1385            schema: schema.clone(),
1386            schema_before_project: schema.clone(),
1387            schema_project: None,
1388            by_schema: Arc::new(Schema::new(vec![Field::new("host", DataType::Utf8, true)])),
1389            metric: ExecutionPlanMetricsSet::new(),
1390            cache,
1391        });
1392        let sort_exec = SortExec::new(
1393            LexOrdering::new(vec![
1394                PhysicalSortExpr {
1395                    expr: Arc::new(Column::new("host", 3)),
1396                    options: SortOptions {
1397                        descending: false,
1398                        nulls_first: true,
1399                    },
1400                },
1401                PhysicalSortExpr {
1402                    expr: Arc::new(Column::new(TIME_INDEX_COLUMN, 2)),
1403                    options: SortOptions {
1404                        descending: false,
1405                        nulls_first: true,
1406                    },
1407                },
1408            ]),
1409            range_select_exec,
1410        );
1411        let session_context = SessionContext::default();
1412        let result =
1413            datafusion::physical_plan::collect(Arc::new(sort_exec), session_context.task_ctx())
1414                .await
1415                .unwrap();
1416
1417        let result_literal = arrow::util::pretty::pretty_format_batches(&result)
1418            .unwrap()
1419            .to_string();
1420
1421        assert_eq!(result_literal, expected);
1422    }
1423
1424    #[tokio::test]
1425    async fn range_10s_align_1000s() {
1426        let expected = String::from(
1427            "+------------+------------+---------------------+-------+\
1428            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1429            \n+------------+------------+---------------------+-------+\
1430            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1431            \n| 3.0        | 3.0        | 1970-01-01T00:00:00 | host2 |\
1432            \n+------------+------------+---------------------+-------+",
1433        );
1434        do_range_select_test(
1435            10_000,
1436            10_000,
1437            1_000_000,
1438            Some(Fill::Null),
1439            true,
1440            false,
1441            expected,
1442        )
1443        .await;
1444    }
1445
1446    #[tokio::test]
1447    async fn range_fill_null() {
1448        let expected = String::from(
1449            "+------------+------------+---------------------+-------+\
1450            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1451            \n+------------+------------+---------------------+-------+\
1452            \n| 0.0        |            | 1969-12-31T23:59:55 | host1 |\
1453            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1454            \n| 1.0        |            | 1970-01-01T00:00:05 | host1 |\
1455            \n| 1.0        | 1.0        | 1970-01-01T00:00:10 | host1 |\
1456            \n| 2.0        |            | 1970-01-01T00:00:15 | host1 |\
1457            \n| 2.0        | 2.0        | 1970-01-01T00:00:20 | host1 |\
1458            \n| 3.0        |            | 1969-12-31T23:59:55 | host2 |\
1459            \n| 3.0        | 3.0        | 1970-01-01T00:00:00 | host2 |\
1460            \n| 4.0        |            | 1970-01-01T00:00:05 | host2 |\
1461            \n| 4.0        | 4.0        | 1970-01-01T00:00:10 | host2 |\
1462            \n| 5.0        |            | 1970-01-01T00:00:15 | host2 |\
1463            \n| 5.0        | 5.0        | 1970-01-01T00:00:20 | host2 |\
1464            \n+------------+------------+---------------------+-------+",
1465        );
1466        do_range_select_test(
1467            10_000,
1468            5_000,
1469            5_000,
1470            Some(Fill::Null),
1471            true,
1472            false,
1473            expected,
1474        )
1475        .await;
1476    }
1477
1478    #[tokio::test]
1479    async fn range_fill_prev() {
1480        let expected = String::from(
1481            "+------------+------------+---------------------+-------+\
1482            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1483            \n+------------+------------+---------------------+-------+\
1484            \n| 0.0        |            | 1969-12-31T23:59:55 | host1 |\
1485            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1486            \n| 1.0        | 0.0        | 1970-01-01T00:00:05 | host1 |\
1487            \n| 1.0        | 1.0        | 1970-01-01T00:00:10 | host1 |\
1488            \n| 2.0        | 1.0        | 1970-01-01T00:00:15 | host1 |\
1489            \n| 2.0        | 2.0        | 1970-01-01T00:00:20 | host1 |\
1490            \n| 3.0        |            | 1969-12-31T23:59:55 | host2 |\
1491            \n| 3.0        | 3.0        | 1970-01-01T00:00:00 | host2 |\
1492            \n| 4.0        | 3.0        | 1970-01-01T00:00:05 | host2 |\
1493            \n| 4.0        | 4.0        | 1970-01-01T00:00:10 | host2 |\
1494            \n| 5.0        | 4.0        | 1970-01-01T00:00:15 | host2 |\
1495            \n| 5.0        | 5.0        | 1970-01-01T00:00:20 | host2 |\
1496            \n+------------+------------+---------------------+-------+",
1497        );
1498        do_range_select_test(
1499            10_000,
1500            5_000,
1501            5_000,
1502            Some(Fill::Prev),
1503            true,
1504            false,
1505            expected,
1506        )
1507        .await;
1508    }
1509
1510    #[tokio::test]
1511    async fn range_fill_linear() {
1512        let expected = String::from(
1513            "+------------+------------+---------------------+-------+\
1514            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1515            \n+------------+------------+---------------------+-------+\
1516            \n| 0.0        | -0.5       | 1969-12-31T23:59:55 | host1 |\
1517            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1518            \n| 1.0        | 0.5        | 1970-01-01T00:00:05 | host1 |\
1519            \n| 1.0        | 1.0        | 1970-01-01T00:00:10 | host1 |\
1520            \n| 2.0        | 1.5        | 1970-01-01T00:00:15 | host1 |\
1521            \n| 2.0        | 2.0        | 1970-01-01T00:00:20 | host1 |\
1522            \n| 3.0        | 2.5        | 1969-12-31T23:59:55 | host2 |\
1523            \n| 3.0        | 3.0        | 1970-01-01T00:00:00 | host2 |\
1524            \n| 4.0        | 3.5        | 1970-01-01T00:00:05 | host2 |\
1525            \n| 4.0        | 4.0        | 1970-01-01T00:00:10 | host2 |\
1526            \n| 5.0        | 4.5        | 1970-01-01T00:00:15 | host2 |\
1527            \n| 5.0        | 5.0        | 1970-01-01T00:00:20 | host2 |\
1528            \n+------------+------------+---------------------+-------+",
1529        );
1530        do_range_select_test(
1531            10_000,
1532            5_000,
1533            5_000,
1534            Some(Fill::Linear),
1535            true,
1536            false,
1537            expected,
1538        )
1539        .await;
1540    }
1541
1542    #[tokio::test]
1543    async fn range_fill_integer_linear() {
1544        let expected = String::from(
1545            "+------------+------------+---------------------+-------+\
1546            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1547            \n+------------+------------+---------------------+-------+\
1548            \n| 0.0        | -0.5       | 1969-12-31T23:59:55 | host1 |\
1549            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1550            \n| 1.0        | 0.5        | 1970-01-01T00:00:05 | host1 |\
1551            \n| 1.0        | 1.0        | 1970-01-01T00:00:10 | host1 |\
1552            \n| 2.0        | 1.5        | 1970-01-01T00:00:15 | host1 |\
1553            \n| 2.0        | 2.0        | 1970-01-01T00:00:20 | host1 |\
1554            \n| 3.0        | 2.5        | 1969-12-31T23:59:55 | host2 |\
1555            \n| 3.0        | 3.0        | 1970-01-01T00:00:00 | host2 |\
1556            \n| 4.0        | 3.5        | 1970-01-01T00:00:05 | host2 |\
1557            \n| 4.0        | 4.0        | 1970-01-01T00:00:10 | host2 |\
1558            \n| 5.0        | 4.5        | 1970-01-01T00:00:15 | host2 |\
1559            \n| 5.0        | 5.0        | 1970-01-01T00:00:20 | host2 |\
1560            \n+------------+------------+---------------------+-------+",
1561        );
1562        do_range_select_test(
1563            10_000,
1564            5_000,
1565            5_000,
1566            Some(Fill::Linear),
1567            false,
1568            false,
1569            expected,
1570        )
1571        .await;
1572    }
1573
1574    #[tokio::test]
1575    async fn range_fill_const() {
1576        let expected = String::from(
1577            "+------------+------------+---------------------+-------+\
1578            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1579            \n+------------+------------+---------------------+-------+\
1580            \n| 0.0        | 6.6        | 1969-12-31T23:59:55 | host1 |\
1581            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1582            \n| 1.0        | 6.6        | 1970-01-01T00:00:05 | host1 |\
1583            \n| 1.0        | 1.0        | 1970-01-01T00:00:10 | host1 |\
1584            \n| 2.0        | 6.6        | 1970-01-01T00:00:15 | host1 |\
1585            \n| 2.0        | 2.0        | 1970-01-01T00:00:20 | host1 |\
1586            \n| 3.0        | 6.6        | 1969-12-31T23:59:55 | host2 |\
1587            \n| 3.0        | 3.0        | 1970-01-01T00:00:00 | host2 |\
1588            \n| 4.0        | 6.6        | 1970-01-01T00:00:05 | host2 |\
1589            \n| 4.0        | 4.0        | 1970-01-01T00:00:10 | host2 |\
1590            \n| 5.0        | 6.6        | 1970-01-01T00:00:15 | host2 |\
1591            \n| 5.0        | 5.0        | 1970-01-01T00:00:20 | host2 |\
1592            \n+------------+------------+---------------------+-------+",
1593        );
1594        do_range_select_test(
1595            10_000,
1596            5_000,
1597            5_000,
1598            Some(Fill::Const(ScalarValue::Float64(Some(6.6)))),
1599            true,
1600            false,
1601            expected,
1602        )
1603        .await;
1604    }
1605
1606    #[tokio::test]
1607    async fn range_fill_gap() {
1608        let expected = String::from(
1609            "+------------+------------+---------------------+-------+\
1610            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1611            \n+------------+------------+---------------------+-------+\
1612            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1613            \n| 6.0        | 6.0        | 1970-01-01T00:00:15 | host1 |\
1614            \n| 6.0        | 6.0        | 1970-01-01T00:00:00 | host2 |\
1615            \n| 12.0       | 12.0       | 1970-01-01T00:00:15 | host2 |\
1616            \n+------------+------------+---------------------+-------+",
1617        );
1618        do_range_select_test(5_000, 5_000, 5_000, None, true, true, expected).await;
1619        let expected = String::from(
1620            "+------------+------------+---------------------+-------+\
1621            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1622            \n+------------+------------+---------------------+-------+\
1623            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1624            \n|            |            | 1970-01-01T00:00:05 | host1 |\
1625            \n|            |            | 1970-01-01T00:00:10 | host1 |\
1626            \n| 6.0        | 6.0        | 1970-01-01T00:00:15 | host1 |\
1627            \n| 6.0        | 6.0        | 1970-01-01T00:00:00 | host2 |\
1628            \n|            |            | 1970-01-01T00:00:05 | host2 |\
1629            \n|            |            | 1970-01-01T00:00:10 | host2 |\
1630            \n| 12.0       | 12.0       | 1970-01-01T00:00:15 | host2 |\
1631            \n+------------+------------+---------------------+-------+",
1632        );
1633        do_range_select_test(5_000, 5_000, 5_000, Some(Fill::Null), true, true, expected).await;
1634        let expected = String::from(
1635            "+------------+------------+---------------------+-------+\
1636            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1637            \n+------------+------------+---------------------+-------+\
1638            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1639            \n| 0.0        | 0.0        | 1970-01-01T00:00:05 | host1 |\
1640            \n| 0.0        | 0.0        | 1970-01-01T00:00:10 | host1 |\
1641            \n| 6.0        | 6.0        | 1970-01-01T00:00:15 | host1 |\
1642            \n| 6.0        | 6.0        | 1970-01-01T00:00:00 | host2 |\
1643            \n| 6.0        | 6.0        | 1970-01-01T00:00:05 | host2 |\
1644            \n| 6.0        | 6.0        | 1970-01-01T00:00:10 | host2 |\
1645            \n| 12.0       | 12.0       | 1970-01-01T00:00:15 | host2 |\
1646            \n+------------+------------+---------------------+-------+",
1647        );
1648        do_range_select_test(5_000, 5_000, 5_000, Some(Fill::Prev), true, true, expected).await;
1649        let expected = String::from(
1650            "+------------+------------+---------------------+-------+\
1651            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1652            \n+------------+------------+---------------------+-------+\
1653            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1654            \n| 2.0        | 2.0        | 1970-01-01T00:00:05 | host1 |\
1655            \n| 4.0        | 4.0        | 1970-01-01T00:00:10 | host1 |\
1656            \n| 6.0        | 6.0        | 1970-01-01T00:00:15 | host1 |\
1657            \n| 6.0        | 6.0        | 1970-01-01T00:00:00 | host2 |\
1658            \n| 8.0        | 8.0        | 1970-01-01T00:00:05 | host2 |\
1659            \n| 10.0       | 10.0       | 1970-01-01T00:00:10 | host2 |\
1660            \n| 12.0       | 12.0       | 1970-01-01T00:00:15 | host2 |\
1661            \n+------------+------------+---------------------+-------+",
1662        );
1663        do_range_select_test(
1664            5_000,
1665            5_000,
1666            5_000,
1667            Some(Fill::Linear),
1668            true,
1669            true,
1670            expected,
1671        )
1672        .await;
1673        let expected = String::from(
1674            "+------------+------------+---------------------+-------+\
1675            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1676            \n+------------+------------+---------------------+-------+\
1677            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1678            \n| 6.0        | 6.0        | 1970-01-01T00:00:05 | host1 |\
1679            \n| 6.0        | 6.0        | 1970-01-01T00:00:10 | host1 |\
1680            \n| 6.0        | 6.0        | 1970-01-01T00:00:15 | host1 |\
1681            \n| 6.0        | 6.0        | 1970-01-01T00:00:00 | host2 |\
1682            \n| 6.0        | 6.0        | 1970-01-01T00:00:05 | host2 |\
1683            \n| 6.0        | 6.0        | 1970-01-01T00:00:10 | host2 |\
1684            \n| 12.0       | 12.0       | 1970-01-01T00:00:15 | host2 |\
1685            \n+------------+------------+---------------------+-------+",
1686        );
1687        do_range_select_test(
1688            5_000,
1689            5_000,
1690            5_000,
1691            Some(Fill::Const(ScalarValue::Float64(Some(6.0)))),
1692            true,
1693            true,
1694            expected,
1695        )
1696        .await;
1697    }
1698
1699    #[test]
1700    fn fill_test() {
1701        assert!(Fill::try_from_str("", &DataType::UInt8).unwrap().is_none());
1702        assert!(Fill::try_from_str("Linear", &DataType::UInt8).unwrap() == Some(Fill::Linear));
1703        assert_eq!(
1704            Fill::try_from_str("Linear", &DataType::Boolean)
1705                .unwrap_err()
1706                .to_string(),
1707            "Error during planning: Use FILL LINEAR on Non-numeric DataType Boolean"
1708        );
1709        assert_eq!(
1710            Fill::try_from_str("WHAT", &DataType::UInt8)
1711                .unwrap_err()
1712                .to_string(),
1713                "Error during planning: WHAT is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string 'WHAT' to value of UInt8 type }"
1714        );
1715        assert_eq!(
1716            Fill::try_from_str("8.0", &DataType::UInt8)
1717                .unwrap_err()
1718                .to_string(),
1719                "Error during planning: 8.0 is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string '8.0' to value of UInt8 type }"
1720        );
1721        assert!(
1722            Fill::try_from_str("8", &DataType::UInt8).unwrap()
1723                == Some(Fill::Const(ScalarValue::UInt8(Some(8))))
1724        );
1725        let mut test1 = vec![
1726            ScalarValue::UInt8(Some(8)),
1727            ScalarValue::UInt8(None),
1728            ScalarValue::UInt8(Some(9)),
1729        ];
1730        Fill::Null.apply_fill_strategy(&[], &mut test1).unwrap();
1731        assert_eq!(test1[1], ScalarValue::UInt8(None));
1732        Fill::Prev.apply_fill_strategy(&[], &mut test1).unwrap();
1733        assert_eq!(test1[1], ScalarValue::UInt8(Some(8)));
1734        test1[1] = ScalarValue::UInt8(None);
1735        Fill::Const(ScalarValue::UInt8(Some(10)))
1736            .apply_fill_strategy(&[], &mut test1)
1737            .unwrap();
1738        assert_eq!(test1[1], ScalarValue::UInt8(Some(10)));
1739    }
1740
1741    #[test]
1742    fn test_fill_linear() {
1743        let ts = vec![1, 2, 3, 4, 5];
1744        let mut test = vec![
1745            ScalarValue::Float32(Some(1.0)),
1746            ScalarValue::Float32(None),
1747            ScalarValue::Float32(Some(3.0)),
1748            ScalarValue::Float32(None),
1749            ScalarValue::Float32(Some(5.0)),
1750        ];
1751        Fill::Linear.apply_fill_strategy(&ts, &mut test).unwrap();
1752        let mut test1 = vec![
1753            ScalarValue::Float32(None),
1754            ScalarValue::Float32(Some(2.0)),
1755            ScalarValue::Float32(None),
1756            ScalarValue::Float32(Some(4.0)),
1757            ScalarValue::Float32(None),
1758        ];
1759        Fill::Linear.apply_fill_strategy(&ts, &mut test1).unwrap();
1760        assert_eq!(test, test1);
1761        // test linear interpolation on irregularly spaced ts/data
1762        let ts = vec![
1763            1,   // None
1764            3,   // 1.0
1765            8,   // 11.0
1766            30,  // None
1767            88,  // 10.0
1768            108, // 5.0
1769            128, // None
1770        ];
1771        let mut test = vec![
1772            ScalarValue::Float64(None),
1773            ScalarValue::Float64(Some(1.0)),
1774            ScalarValue::Float64(Some(11.0)),
1775            ScalarValue::Float64(None),
1776            ScalarValue::Float64(Some(10.0)),
1777            ScalarValue::Float64(Some(5.0)),
1778            ScalarValue::Float64(None),
1779        ];
1780        Fill::Linear.apply_fill_strategy(&ts, &mut test).unwrap();
1781        let data: Vec<_> = test
1782            .into_iter()
1783            .map(|x| {
1784                let ScalarValue::Float64(Some(f)) = x else {
1785                    unreachable!()
1786                };
1787                f
1788            })
1789            .collect();
1790        assert_eq!(data, vec![-3.0, 1.0, 11.0, 10.725, 10.0, 5.0, 0.0]);
1791        // test corner case
1792        let ts = vec![1];
1793        let test = vec![ScalarValue::Float32(None)];
1794        let mut test1 = test.clone();
1795        Fill::Linear.apply_fill_strategy(&ts, &mut test1).unwrap();
1796        assert_eq!(test, test1);
1797    }
1798}