query/range_select/
plan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::btree_map::Entry;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Display;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use ahash::RandomState;
25use arrow::compute::{self, cast_with_options, take_arrays, CastOptions};
26use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions, TimeUnit};
27use common_recordbatch::DfSendableRecordBatchStream;
28use datafusion::common::Result as DataFusionResult;
29use datafusion::error::Result as DfResult;
30use datafusion::execution::context::SessionState;
31use datafusion::execution::TaskContext;
32use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
33use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
34use datafusion::physical_plan::{
35    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream,
36    SendableRecordBatchStream,
37};
38use datafusion_common::hash_utils::create_hashes;
39use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, ScalarValue};
40use datafusion_expr::utils::{exprlist_to_fields, COUNT_STAR_EXPANSION};
41use datafusion_expr::{
42    lit, Accumulator, Expr, ExprSchemable, LogicalPlan, UserDefinedLogicalNodeCore,
43};
44use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
45use datafusion_physical_expr::{
46    create_physical_expr, create_physical_sort_expr, Distribution, EquivalenceProperties,
47    Partitioning, PhysicalExpr, PhysicalSortExpr,
48};
49use datatypes::arrow::array::{
50    Array, ArrayRef, TimestampMillisecondArray, TimestampMillisecondBuilder, UInt32Builder,
51};
52use datatypes::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType};
53use datatypes::arrow::record_batch::RecordBatch;
54use datatypes::arrow::row::{OwnedRow, RowConverter, SortField};
55use futures::{ready, Stream};
56use futures_util::StreamExt;
57use snafu::ensure;
58
59use crate::error::{RangeQuerySnafu, Result};
60
61type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native;
62
63#[derive(PartialEq, Eq, Debug, Hash, Clone)]
64pub enum Fill {
65    Null,
66    Prev,
67    Linear,
68    Const(ScalarValue),
69}
70
71impl Display for Fill {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        match self {
74            Fill::Null => write!(f, "NULL"),
75            Fill::Prev => write!(f, "PREV"),
76            Fill::Linear => write!(f, "LINEAR"),
77            Fill::Const(x) => write!(f, "{}", x),
78        }
79    }
80}
81
82impl Fill {
83    pub fn try_from_str(value: &str, datatype: &DataType) -> DfResult<Option<Self>> {
84        let s = value.to_uppercase();
85        match s.as_str() {
86            "" => Ok(None),
87            "NULL" => Ok(Some(Self::Null)),
88            "PREV" => Ok(Some(Self::Prev)),
89            "LINEAR" => {
90                if datatype.is_numeric() {
91                    Ok(Some(Self::Linear))
92                } else {
93                    Err(DataFusionError::Plan(format!(
94                        "Use FILL LINEAR on Non-numeric DataType {}",
95                        datatype
96                    )))
97                }
98            }
99            _ => ScalarValue::try_from_string(s.clone(), datatype)
100                .map_err(|err| {
101                    DataFusionError::Plan(format!(
102                        "{} is not a valid fill option, fail to convert to a const value. {{ {} }}",
103                        s, err
104                    ))
105                })
106                .map(|x| Some(Fill::Const(x))),
107        }
108    }
109
110    /// The input `data` contains data on a complete time series.
111    /// If the filling strategy is `PREV` or `LINEAR`, caller must be ensured that the incoming `ts`&`data` is ascending time order.
112    pub fn apply_fill_strategy(&self, ts: &[i64], data: &mut [ScalarValue]) -> DfResult<()> {
113        // No calculation need in `Fill::Null`
114        if matches!(self, Fill::Null) {
115            return Ok(());
116        }
117        let len = data.len();
118        if *self == Fill::Linear {
119            return Self::fill_linear(ts, data);
120        }
121        for i in 0..len {
122            if data[i].is_null() {
123                match self {
124                    Fill::Prev => {
125                        if i != 0 {
126                            data[i] = data[i - 1].clone()
127                        }
128                    }
129                    // The calculation of linear interpolation is relatively complicated.
130                    // `Self::fill_linear` is used to dispose `Fill::Linear`.
131                    // No calculation need in `Fill::Null`
132                    Fill::Linear | Fill::Null => unreachable!(),
133                    Fill::Const(v) => data[i] = v.clone(),
134                }
135            }
136        }
137        Ok(())
138    }
139
140    fn fill_linear(ts: &[i64], data: &mut [ScalarValue]) -> DfResult<()> {
141        let not_null_num = data
142            .iter()
143            .fold(0, |acc, x| if x.is_null() { acc } else { acc + 1 });
144        // We need at least two non-empty data points to perform linear interpolation
145        if not_null_num < 2 {
146            return Ok(());
147        }
148        let mut index = 0;
149        let mut head: Option<usize> = None;
150        let mut tail: Option<usize> = None;
151        while index < data.len() {
152            // find null interval [start, end)
153            // start is null, end is not-null
154            let start = data[index..]
155                .iter()
156                .position(ScalarValue::is_null)
157                .unwrap_or(data.len() - index)
158                + index;
159            if start == data.len() {
160                break;
161            }
162            let end = data[start..]
163                .iter()
164                .position(|r| !r.is_null())
165                .unwrap_or(data.len() - start)
166                + start;
167            index = end + 1;
168            // head or tail null dispose later, record start/end first
169            if start == 0 {
170                head = Some(end);
171            } else if end == data.len() {
172                tail = Some(start);
173            } else {
174                linear_interpolation(ts, data, start - 1, end, start, end)?;
175            }
176        }
177        // dispose head null interval
178        if let Some(end) = head {
179            linear_interpolation(ts, data, end, end + 1, 0, end)?;
180        }
181        // dispose tail null interval
182        if let Some(start) = tail {
183            linear_interpolation(ts, data, start - 2, start - 1, start, data.len())?;
184        }
185        Ok(())
186    }
187}
188
189/// use `(ts[i1], data[i1])`, `(ts[i2], data[i2])` as endpoint, linearly interpolates element over the interval `[start, end)`
190fn linear_interpolation(
191    ts: &[i64],
192    data: &mut [ScalarValue],
193    i1: usize,
194    i2: usize,
195    start: usize,
196    end: usize,
197) -> DfResult<()> {
198    let (x0, x1) = (ts[i1] as f64, ts[i2] as f64);
199    let (y0, y1, is_float32) = match (&data[i1], &data[i2]) {
200        (ScalarValue::Float64(Some(y0)), ScalarValue::Float64(Some(y1))) => (*y0, *y1, false),
201        (ScalarValue::Float32(Some(y0)), ScalarValue::Float32(Some(y1))) => {
202            (*y0 as f64, *y1 as f64, true)
203        }
204        _ => {
205            return Err(DataFusionError::Execution(
206                "RangePlan: Apply Fill LINEAR strategy on Non-floating type".to_string(),
207            ));
208        }
209    };
210    // To avoid divide zero error, kind of defensive programming
211    if x1 == x0 {
212        return Err(DataFusionError::Execution(
213            "RangePlan: Linear interpolation using the same coordinate points".to_string(),
214        ));
215    }
216    for i in start..end {
217        let val = y0 + (y1 - y0) / (x1 - x0) * (ts[i] as f64 - x0);
218        data[i] = if is_float32 {
219            ScalarValue::Float32(Some(val as f32))
220        } else {
221            ScalarValue::Float64(Some(val))
222        }
223    }
224    Ok(())
225}
226
227#[derive(Eq, Clone, Debug)]
228pub struct RangeFn {
229    /// with format like `max(a) RANGE 300s [FILL NULL]`
230    pub name: String,
231    pub data_type: DataType,
232    pub expr: Expr,
233    pub range: Duration,
234    pub fill: Option<Fill>,
235    /// If the `FIll` strategy is `Linear` and the output is an integer,
236    /// it is possible to calculate a floating point number.
237    /// So for `FILL==LINEAR`, the entire data will be implicitly converted to Float type
238    /// If `need_cast==true`, `data_type` may not consist with type `expr` generated.
239    pub need_cast: bool,
240}
241
242impl PartialEq for RangeFn {
243    fn eq(&self, other: &Self) -> bool {
244        self.name == other.name
245    }
246}
247
248impl PartialOrd for RangeFn {
249    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
250        Some(self.cmp(other))
251    }
252}
253
254impl Ord for RangeFn {
255    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
256        self.name.cmp(&other.name)
257    }
258}
259
260impl std::hash::Hash for RangeFn {
261    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
262        self.name.hash(state);
263    }
264}
265
266impl Display for RangeFn {
267    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
268        write!(f, "{}", self.name)
269    }
270}
271
272#[derive(Debug, PartialEq, Eq, Hash)]
273pub struct RangeSelect {
274    /// The incoming logical plan
275    pub input: Arc<LogicalPlan>,
276    /// all range expressions
277    pub range_expr: Vec<RangeFn>,
278    pub align: Duration,
279    pub align_to: i64,
280    pub time_index: String,
281    pub time_expr: Expr,
282    pub by: Vec<Expr>,
283    pub schema: DFSchemaRef,
284    pub by_schema: DFSchemaRef,
285    /// If the `schema` of the `RangeSelect` happens to be the same as the content of the upper-level Projection Plan,
286    /// the final output needs to be `project` through `schema_project`,
287    /// so that we can omit the upper-level Projection Plan.
288    pub schema_project: Option<Vec<usize>>,
289    /// The schema before run projection, follow the order of `range expr | time index | by columns`
290    /// `schema_before_project  ----  schema_project ----> schema`
291    /// if `schema_project==None` then `schema_before_project==schema`
292    pub schema_before_project: DFSchemaRef,
293}
294
295impl PartialOrd for RangeSelect {
296    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
297        // Compare fields in order excluding `schema`, `by_schema`, `schema_before_project`.
298        match self.input.partial_cmp(&other.input) {
299            Some(Ordering::Equal) => {}
300            ord => return ord,
301        }
302        match self.range_expr.partial_cmp(&other.range_expr) {
303            Some(Ordering::Equal) => {}
304            ord => return ord,
305        }
306        match self.align.partial_cmp(&other.align) {
307            Some(Ordering::Equal) => {}
308            ord => return ord,
309        }
310        match self.align_to.partial_cmp(&other.align_to) {
311            Some(Ordering::Equal) => {}
312            ord => return ord,
313        }
314        match self.time_index.partial_cmp(&other.time_index) {
315            Some(Ordering::Equal) => {}
316            ord => return ord,
317        }
318        match self.time_expr.partial_cmp(&other.time_expr) {
319            Some(Ordering::Equal) => {}
320            ord => return ord,
321        }
322        match self.by.partial_cmp(&other.by) {
323            Some(Ordering::Equal) => {}
324            ord => return ord,
325        }
326        self.schema_project.partial_cmp(&other.schema_project)
327    }
328}
329
330impl RangeSelect {
331    pub fn try_new(
332        input: Arc<LogicalPlan>,
333        range_expr: Vec<RangeFn>,
334        align: Duration,
335        align_to: i64,
336        time_index: Expr,
337        by: Vec<Expr>,
338        projection_expr: &[Expr],
339    ) -> Result<Self> {
340        ensure!(
341            align.as_millis() != 0,
342            RangeQuerySnafu {
343                msg: "Can't use 0 as align in Range Query"
344            }
345        );
346        for expr in &range_expr {
347            ensure!(
348                expr.range.as_millis() != 0,
349                RangeQuerySnafu {
350                    msg: format!(
351                        "Invalid Range expr `{}`, Can't use 0 as range in Range Query",
352                        expr.name
353                    )
354                }
355            );
356        }
357        let mut fields = range_expr
358            .iter()
359            .map(
360                |RangeFn {
361                     name,
362                     data_type,
363                     fill,
364                     ..
365                 }| {
366                    let field = Field::new(
367                        name,
368                        data_type.clone(),
369                        // Only when data fill with Const option, the data can't be null
370                        !matches!(fill, Some(Fill::Const(..))),
371                    );
372                    Ok((None, Arc::new(field)))
373                },
374            )
375            .collect::<DfResult<Vec<_>>>()?;
376        // add align_ts
377        let ts_field = time_index.to_field(input.schema().as_ref())?;
378        let time_index_name = ts_field.1.name().clone();
379        fields.push(ts_field);
380        // add by
381        let by_fields = exprlist_to_fields(&by, &input)?;
382        fields.extend(by_fields.clone());
383        let schema_before_project = Arc::new(DFSchema::new_with_metadata(
384            fields,
385            input.schema().metadata().clone(),
386        )?);
387        let by_schema = Arc::new(DFSchema::new_with_metadata(
388            by_fields,
389            input.schema().metadata().clone(),
390        )?);
391        // If the results of project plan can be obtained directly from range plan without any additional
392        // calculations, no project plan is required. We can simply project the final output of the range
393        // plan to produce the final result.
394        let schema_project = projection_expr
395            .iter()
396            .map(|project_expr| {
397                if let Expr::Column(column) = project_expr {
398                    schema_before_project
399                        .index_of_column_by_name(column.relation.as_ref(), &column.name)
400                        .ok_or(())
401                } else {
402                    let (qualifier, field) = project_expr
403                        .to_field(input.schema().as_ref())
404                        .map_err(|_| ())?;
405                    schema_before_project
406                        .index_of_column_by_name(qualifier.as_ref(), field.name())
407                        .ok_or(())
408                }
409            })
410            .collect::<std::result::Result<Vec<usize>, ()>>()
411            .ok();
412        let schema = if let Some(project) = &schema_project {
413            let project_field = project
414                .iter()
415                .map(|i| {
416                    let f = schema_before_project.qualified_field(*i);
417                    (f.0.cloned(), Arc::new(f.1.clone()))
418                })
419                .collect();
420            Arc::new(DFSchema::new_with_metadata(
421                project_field,
422                input.schema().metadata().clone(),
423            )?)
424        } else {
425            schema_before_project.clone()
426        };
427        Ok(Self {
428            input,
429            range_expr,
430            align,
431            align_to,
432            time_index: time_index_name,
433            time_expr: time_index,
434            schema,
435            by_schema,
436            by,
437            schema_project,
438            schema_before_project,
439        })
440    }
441}
442
443impl UserDefinedLogicalNodeCore for RangeSelect {
444    fn name(&self) -> &str {
445        "RangeSelect"
446    }
447
448    fn inputs(&self) -> Vec<&LogicalPlan> {
449        vec![&self.input]
450    }
451
452    fn schema(&self) -> &DFSchemaRef {
453        &self.schema
454    }
455
456    fn expressions(&self) -> Vec<Expr> {
457        self.range_expr
458            .iter()
459            .map(|expr| expr.expr.clone())
460            .chain([self.time_expr.clone()])
461            .chain(self.by.clone())
462            .collect()
463    }
464
465    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
466        write!(
467            f,
468            "RangeSelect: range_exprs=[{}], align={}ms, align_to={}ms, align_by=[{}], time_index={}",
469            self.range_expr
470                .iter()
471                .map(ToString::to_string)
472                .collect::<Vec<_>>()
473                .join(", "),
474            self.align.as_millis(),
475            self.align_to,
476            self.by
477                .iter()
478                .map(ToString::to_string)
479                .collect::<Vec<_>>()
480                .join(", "),
481            self.time_index
482        )
483    }
484
485    fn with_exprs_and_inputs(
486        &self,
487        exprs: Vec<Expr>,
488        inputs: Vec<LogicalPlan>,
489    ) -> DataFusionResult<Self> {
490        if inputs.is_empty() {
491            return Err(DataFusionError::Plan(
492                "RangeSelect: inputs is empty".to_string(),
493            ));
494        }
495        if exprs.len() != self.range_expr.len() + self.by.len() + 1 {
496            return Err(DataFusionError::Plan(
497                "RangeSelect: exprs length not match".to_string(),
498            ));
499        }
500
501        let range_expr = exprs
502            .iter()
503            .zip(self.range_expr.iter())
504            .map(|(e, range)| RangeFn {
505                name: range.name.clone(),
506                data_type: range.data_type.clone(),
507                expr: e.clone(),
508                range: range.range,
509                fill: range.fill.clone(),
510                need_cast: range.need_cast,
511            })
512            .collect();
513        let time_expr = exprs[self.range_expr.len()].clone();
514        let by = exprs[self.range_expr.len() + 1..].to_vec();
515        Ok(Self {
516            align: self.align,
517            align_to: self.align_to,
518            range_expr,
519            input: Arc::new(inputs[0].clone()),
520            time_index: self.time_index.clone(),
521            time_expr,
522            schema: self.schema.clone(),
523            by,
524            by_schema: self.by_schema.clone(),
525            schema_project: self.schema_project.clone(),
526            schema_before_project: self.schema_before_project.clone(),
527        })
528    }
529}
530
531impl RangeSelect {
532    fn create_physical_expr_list(
533        &self,
534        is_count_aggr: bool,
535        exprs: &[Expr],
536        df_schema: &Arc<DFSchema>,
537        session_state: &SessionState,
538    ) -> DfResult<Vec<Arc<dyn PhysicalExpr>>> {
539        exprs
540            .iter()
541            .map(|e| match e {
542                // `count(*)` will be rewritten by `CountWildcardRule` into `count(1)` when optimizing logical plan.
543                // The modification occurs after range plan rewrite.
544                // At this time, aggregate plan has been replaced by a custom range plan,
545                // so `CountWildcardRule` has not been applied.
546                // We manually modify it when creating the physical plan.
547                #[expect(deprecated)]
548                Expr::Wildcard { .. } if is_count_aggr => create_physical_expr(
549                    &lit(COUNT_STAR_EXPANSION),
550                    df_schema.as_ref(),
551                    session_state.execution_props(),
552                ),
553                _ => create_physical_expr(e, df_schema.as_ref(), session_state.execution_props()),
554            })
555            .collect::<DfResult<Vec<_>>>()
556    }
557
558    pub fn to_execution_plan(
559        &self,
560        logical_input: &LogicalPlan,
561        exec_input: Arc<dyn ExecutionPlan>,
562        session_state: &SessionState,
563    ) -> DfResult<Arc<dyn ExecutionPlan>> {
564        let fields: Vec<_> = self
565            .schema_before_project
566            .fields()
567            .iter()
568            .map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable()))
569            .collect();
570        let by_fields: Vec<_> = self
571            .by_schema
572            .fields()
573            .iter()
574            .map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable()))
575            .collect();
576        let input_dfschema = logical_input.schema();
577        let input_schema = exec_input.schema();
578        let range_exec: Vec<RangeFnExec> = self
579            .range_expr
580            .iter()
581            .map(|range_fn| {
582                let name = range_fn.expr.schema_name().to_string();
583                let range_expr = match &range_fn.expr {
584                    Expr::Alias(expr) => expr.expr.as_ref(),
585                    others => others,
586                };
587
588                let expr = match &range_expr {
589                    Expr::AggregateFunction(aggr)
590                        if (aggr.func.name() == "last_value"
591                            || aggr.func.name() == "first_value") =>
592                    {
593                        let order_by = if !aggr.params.order_by.is_empty() {
594                            aggr.params
595                                .order_by
596                                .iter()
597                                .map(|x| {
598                                    create_physical_sort_expr(
599                                        x,
600                                        input_dfschema.as_ref(),
601                                        session_state.execution_props(),
602                                    )
603                                })
604                                .collect::<DfResult<Vec<_>>>()?
605                        } else {
606                            // if user not assign order by, time index is needed as default ordering
607                            let time_index = create_physical_expr(
608                                &self.time_expr,
609                                input_dfschema.as_ref(),
610                                session_state.execution_props(),
611                            )?;
612                            vec![PhysicalSortExpr {
613                                expr: time_index,
614                                options: SortOptions {
615                                    descending: false,
616                                    nulls_first: false,
617                                },
618                            }]
619                        };
620                        let arg = self.create_physical_expr_list(
621                            false,
622                            &aggr.params.args,
623                            input_dfschema,
624                            session_state,
625                        )?;
626                        // first_value/last_value has only one param.
627                        // The param have been checked by datafusion in logical plan stage.
628                        // We can safely assume that there is only one element here.
629                        AggregateExprBuilder::new(aggr.func.clone(), arg)
630                            .schema(input_schema.clone())
631                            .order_by(order_by)
632                            .alias(name)
633                            .build()
634                    }
635                    Expr::AggregateFunction(aggr) => {
636                        let order_by = if !aggr.params.order_by.is_empty() {
637                            aggr.params
638                                .order_by
639                                .iter()
640                                .map(|x| {
641                                    create_physical_sort_expr(
642                                        x,
643                                        input_dfschema.as_ref(),
644                                        session_state.execution_props(),
645                                    )
646                                })
647                                .collect::<DfResult<Vec<_>>>()?
648                        } else {
649                            vec![]
650                        };
651                        let distinct = aggr.params.distinct;
652                        // TODO(discord9): add default null treatment?
653
654                        let input_phy_exprs = self.create_physical_expr_list(
655                            aggr.func.name() == "count",
656                            &aggr.params.args,
657                            input_dfschema,
658                            session_state,
659                        )?;
660                        AggregateExprBuilder::new(aggr.func.clone(), input_phy_exprs)
661                            .schema(input_schema.clone())
662                            .order_by(order_by)
663                            .with_distinct(distinct)
664                            .alias(name)
665                            .build()
666                    }
667                    _ => Err(DataFusionError::Plan(format!(
668                        "Unexpected Expr: {} in RangeSelect",
669                        range_fn.expr
670                    ))),
671                }?;
672                Ok(RangeFnExec {
673                    expr: Arc::new(expr),
674                    range: range_fn.range.as_millis() as Millisecond,
675                    fill: range_fn.fill.clone(),
676                    need_cast: if range_fn.need_cast {
677                        Some(range_fn.data_type.clone())
678                    } else {
679                        None
680                    },
681                })
682            })
683            .collect::<DfResult<Vec<_>>>()?;
684        let schema_before_project = Arc::new(Schema::new(fields));
685        let schema = if let Some(project) = &self.schema_project {
686            Arc::new(schema_before_project.project(project)?)
687        } else {
688            schema_before_project.clone()
689        };
690        let by = self.create_physical_expr_list(false, &self.by, input_dfschema, session_state)?;
691        let cache = PlanProperties::new(
692            EquivalenceProperties::new(schema.clone()),
693            Partitioning::UnknownPartitioning(1),
694            EmissionType::Incremental,
695            Boundedness::Bounded,
696        );
697        Ok(Arc::new(RangeSelectExec {
698            input: exec_input,
699            range_exec,
700            align: self.align.as_millis() as Millisecond,
701            align_to: self.align_to,
702            by,
703            time_index: self.time_index.clone(),
704            schema,
705            by_schema: Arc::new(Schema::new(by_fields)),
706            metric: ExecutionPlanMetricsSet::new(),
707            schema_before_project,
708            schema_project: self.schema_project.clone(),
709            cache,
710        }))
711    }
712}
713
714/// Range function expression.
715#[derive(Debug, Clone)]
716struct RangeFnExec {
717    expr: Arc<AggregateFunctionExpr>,
718    range: Millisecond,
719    fill: Option<Fill>,
720    need_cast: Option<DataType>,
721}
722
723impl RangeFnExec {
724    /// Returns the expressions to pass to the aggregator.
725    /// It also adds the order by expressions to the list of expressions.
726    /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` requires this.
727    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
728        let mut exprs = self.expr.expressions();
729        exprs.extend(self.expr.order_bys().iter().map(|sort| sort.expr.clone()));
730        exprs
731    }
732}
733
734impl Display for RangeFnExec {
735    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
736        if let Some(fill) = &self.fill {
737            write!(
738                f,
739                "{} RANGE {}s FILL {}",
740                self.expr.name(),
741                self.range / 1000,
742                fill
743            )
744        } else {
745            write!(f, "{} RANGE {}s", self.expr.name(), self.range / 1000)
746        }
747    }
748}
749
750#[derive(Debug)]
751pub struct RangeSelectExec {
752    input: Arc<dyn ExecutionPlan>,
753    range_exec: Vec<RangeFnExec>,
754    align: Millisecond,
755    align_to: i64,
756    time_index: String,
757    by: Vec<Arc<dyn PhysicalExpr>>,
758    schema: SchemaRef,
759    by_schema: SchemaRef,
760    metric: ExecutionPlanMetricsSet,
761    schema_project: Option<Vec<usize>>,
762    schema_before_project: SchemaRef,
763    cache: PlanProperties,
764}
765
766impl DisplayAs for RangeSelectExec {
767    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
768        match t {
769            DisplayFormatType::Default
770            | DisplayFormatType::Verbose
771            | DisplayFormatType::TreeRender => {
772                write!(f, "RangeSelectExec: ")?;
773                let range_expr_strs: Vec<String> =
774                    self.range_exec.iter().map(RangeFnExec::to_string).collect();
775                let by: Vec<String> = self.by.iter().map(|e| e.to_string()).collect();
776                write!(
777                    f,
778                    "range_expr=[{}], align={}ms, align_to={}ms, align_by=[{}], time_index={}",
779                    range_expr_strs.join(", "),
780                    self.align,
781                    self.align_to,
782                    by.join(", "),
783                    self.time_index,
784                )?;
785            }
786        }
787        Ok(())
788    }
789}
790
791impl ExecutionPlan for RangeSelectExec {
792    fn as_any(&self) -> &dyn std::any::Any {
793        self
794    }
795
796    fn schema(&self) -> SchemaRef {
797        self.schema.clone()
798    }
799
800    fn required_input_distribution(&self) -> Vec<Distribution> {
801        vec![Distribution::SinglePartition]
802    }
803
804    fn properties(&self) -> &PlanProperties {
805        &self.cache
806    }
807
808    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
809        vec![&self.input]
810    }
811
812    fn with_new_children(
813        self: Arc<Self>,
814        children: Vec<Arc<dyn ExecutionPlan>>,
815    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
816        assert!(!children.is_empty());
817        Ok(Arc::new(Self {
818            input: children[0].clone(),
819            range_exec: self.range_exec.clone(),
820            time_index: self.time_index.clone(),
821            by: self.by.clone(),
822            align: self.align,
823            align_to: self.align_to,
824            schema: self.schema.clone(),
825            by_schema: self.by_schema.clone(),
826            metric: self.metric.clone(),
827            schema_before_project: self.schema_before_project.clone(),
828            schema_project: self.schema_project.clone(),
829            cache: self.cache.clone(),
830        }))
831    }
832
833    fn execute(
834        &self,
835        partition: usize,
836        context: Arc<TaskContext>,
837    ) -> DfResult<DfSendableRecordBatchStream> {
838        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
839        let input = self.input.execute(partition, context)?;
840        let schema = input.schema();
841        let time_index = schema
842            .column_with_name(&self.time_index)
843            .ok_or(DataFusionError::Execution(
844                "time index column not found".into(),
845            ))?
846            .0;
847        let row_converter = RowConverter::new(
848            self.by_schema
849                .fields()
850                .iter()
851                .map(|f| SortField::new(f.data_type().clone()))
852                .collect(),
853        )?;
854        Ok(Box::pin(RangeSelectStream {
855            schema: self.schema.clone(),
856            range_exec: self.range_exec.clone(),
857            input,
858            random_state: RandomState::new(),
859            time_index,
860            align: self.align,
861            align_to: self.align_to,
862            by: self.by.clone(),
863            series_map: HashMap::new(),
864            exec_state: ExecutionState::ReadingInput,
865            num_not_null_rows: 0,
866            row_converter,
867            modify_map: HashMap::new(),
868            metric: baseline_metric,
869            schema_project: self.schema_project.clone(),
870            schema_before_project: self.schema_before_project.clone(),
871        }))
872    }
873
874    fn metrics(&self) -> Option<MetricsSet> {
875        Some(self.metric.clone_inner())
876    }
877
878    fn name(&self) -> &str {
879        "RanegSelectExec"
880    }
881}
882
883struct RangeSelectStream {
884    /// the schema of output column
885    schema: SchemaRef,
886    range_exec: Vec<RangeFnExec>,
887    input: SendableRecordBatchStream,
888    /// Column index of TIME INDEX column's position in the input schema
889    time_index: usize,
890    /// the unit of `align` is millisecond
891    align: Millisecond,
892    align_to: i64,
893    by: Vec<Arc<dyn PhysicalExpr>>,
894    exec_state: ExecutionState,
895    /// Converter for the by values
896    row_converter: RowConverter,
897    random_state: RandomState,
898    /// key: time series's hash value
899    /// value: time series's state on different align_ts
900    series_map: HashMap<u64, SeriesState>,
901    /// key: `(hash of by rows, align_ts)`
902    /// value: `[row_ids]`
903    /// It is used to record the data that needs to be aggregated in each time slot during the data update process
904    modify_map: HashMap<(u64, Millisecond), Vec<u32>>,
905    /// The number of rows of not null rows in the final output
906    num_not_null_rows: usize,
907    metric: BaselineMetrics,
908    schema_project: Option<Vec<usize>>,
909    schema_before_project: SchemaRef,
910}
911
912#[derive(Debug)]
913struct SeriesState {
914    /// by values written by `RowWriter`
915    row: OwnedRow,
916    /// key: align_ts
917    /// value: a vector, each element is a range_fn follow the order of `range_exec`
918    align_ts_accumulator: BTreeMap<Millisecond, Vec<Box<dyn Accumulator>>>,
919}
920
921/// Use `align_to` as time origin.
922/// According to `align` as time interval, produces aligned time.
923/// Combining the parameters related to the range query,
924/// determine for each `Accumulator` `(hash, align_ts)` define,
925/// which rows of data will be applied to it.
926fn produce_align_time(
927    align_to: i64,
928    range: Millisecond,
929    align: Millisecond,
930    ts_column: &TimestampMillisecondArray,
931    by_columns_hash: &[u64],
932    modify_map: &mut HashMap<(u64, Millisecond), Vec<u32>>,
933) {
934    modify_map.clear();
935    // make modify_map for range_fn[i]
936    for (row, hash) in by_columns_hash.iter().enumerate() {
937        let ts = ts_column.value(row);
938        let ith_slot = (ts - align_to).div_floor(align);
939        let mut align_ts = ith_slot * align + align_to;
940        while align_ts <= ts && ts < align_ts + range {
941            modify_map
942                .entry((*hash, align_ts))
943                .or_default()
944                .push(row as u32);
945            align_ts -= align;
946        }
947    }
948}
949
950fn cast_scalar_values(values: &mut [ScalarValue], data_type: &DataType) -> DfResult<()> {
951    let array = ScalarValue::iter_to_array(values.to_vec())?;
952    let cast_array = cast_with_options(&array, data_type, &CastOptions::default())?;
953    for (i, value) in values.iter_mut().enumerate() {
954        *value = ScalarValue::try_from_array(&cast_array, i)?;
955    }
956    Ok(())
957}
958
959impl RangeSelectStream {
960    fn evaluate_many(
961        &self,
962        batch: &RecordBatch,
963        exprs: &[Arc<dyn PhysicalExpr>],
964    ) -> DfResult<Vec<ArrayRef>> {
965        exprs
966            .iter()
967            .map(|expr| {
968                let value = expr.evaluate(batch)?;
969                value.into_array(batch.num_rows())
970            })
971            .collect::<DfResult<Vec<_>>>()
972    }
973
974    fn update_range_context(&mut self, batch: RecordBatch) -> DfResult<()> {
975        let _timer = self.metric.elapsed_compute().timer();
976        let num_rows = batch.num_rows();
977        let by_arrays = self.evaluate_many(&batch, &self.by)?;
978        let mut hashes = vec![0; num_rows];
979        create_hashes(&by_arrays, &self.random_state, &mut hashes)?;
980        let by_rows = self.row_converter.convert_columns(&by_arrays)?;
981        let mut ts_column = batch.column(self.time_index).clone();
982        if !matches!(
983            ts_column.data_type(),
984            DataType::Timestamp(TimeUnit::Millisecond, _)
985        ) {
986            ts_column = compute::cast(
987                ts_column.as_ref(),
988                &DataType::Timestamp(TimeUnit::Millisecond, None),
989            )?;
990        }
991        let ts_column_ref = ts_column
992            .as_any()
993            .downcast_ref::<TimestampMillisecondArray>()
994            .ok_or_else(|| {
995                DataFusionError::Execution(
996                    "Time index Column downcast to TimestampMillisecondArray failed".into(),
997                )
998            })?;
999        for i in 0..self.range_exec.len() {
1000            let args = self.evaluate_many(&batch, &self.range_exec[i].expressions())?;
1001            // use self.modify_map record (hash, align_ts) => [row_nums]
1002            produce_align_time(
1003                self.align_to,
1004                self.range_exec[i].range,
1005                self.align,
1006                ts_column_ref,
1007                &hashes,
1008                &mut self.modify_map,
1009            );
1010            // build modify_rows/modify_index/offsets for batch update
1011            let mut modify_rows = UInt32Builder::with_capacity(0);
1012            // (hash, align_ts, row_num)
1013            // row_num use to find a by value
1014            // So we just need to record the row_num of a modify row randomly, because they all have the same by value
1015            let mut modify_index = Vec::with_capacity(self.modify_map.len());
1016            let mut offsets = vec![0];
1017            let mut offset_so_far = 0;
1018            for ((hash, ts), modify) in &self.modify_map {
1019                modify_rows.append_slice(modify);
1020                offset_so_far += modify.len();
1021                offsets.push(offset_so_far);
1022                modify_index.push((*hash, *ts, modify[0]));
1023            }
1024            let modify_rows = modify_rows.finish();
1025            let args = take_arrays(&args, &modify_rows, None)?;
1026            modify_index.iter().zip(offsets.windows(2)).try_for_each(
1027                |((hash, ts, row), offset)| {
1028                    let (offset, length) = (offset[0], offset[1] - offset[0]);
1029                    let sliced_arrays: Vec<ArrayRef> = args
1030                        .iter()
1031                        .map(|array| array.slice(offset, length))
1032                        .collect();
1033                    let accumulators_map =
1034                        self.series_map.entry(*hash).or_insert_with(|| SeriesState {
1035                            row: by_rows.row(*row as usize).owned(),
1036                            align_ts_accumulator: BTreeMap::new(),
1037                        });
1038                    match accumulators_map.align_ts_accumulator.entry(*ts) {
1039                        Entry::Occupied(mut e) => {
1040                            let accumulators = e.get_mut();
1041                            accumulators[i].update_batch(&sliced_arrays)
1042                        }
1043                        Entry::Vacant(e) => {
1044                            self.num_not_null_rows += 1;
1045                            let mut accumulators = self
1046                                .range_exec
1047                                .iter()
1048                                .map(|range| range.expr.create_accumulator())
1049                                .collect::<DfResult<Vec<_>>>()?;
1050                            let result = accumulators[i].update_batch(&sliced_arrays);
1051                            e.insert(accumulators);
1052                            result
1053                        }
1054                    }
1055                },
1056            )?;
1057        }
1058        Ok(())
1059    }
1060
1061    fn generate_output(&mut self) -> DfResult<RecordBatch> {
1062        let _timer = self.metric.elapsed_compute().timer();
1063        if self.series_map.is_empty() {
1064            return Ok(RecordBatch::new_empty(self.schema.clone()));
1065        }
1066        // 1 for time index column
1067        let mut columns: Vec<Arc<dyn Array>> =
1068            Vec::with_capacity(1 + self.range_exec.len() + self.by.len());
1069        let mut ts_builder = TimestampMillisecondBuilder::with_capacity(self.num_not_null_rows);
1070        let mut all_scalar =
1071            vec![Vec::with_capacity(self.num_not_null_rows); self.range_exec.len()];
1072        let mut by_rows = Vec::with_capacity(self.num_not_null_rows);
1073        let mut start_index = 0;
1074        // If any range expr need fill, we need fill both the missing align_ts and null value.
1075        let need_fill_output = self.range_exec.iter().any(|range| range.fill.is_some());
1076        // The padding value for each accumulator
1077        let padding_values = self
1078            .range_exec
1079            .iter()
1080            .map(|e| e.expr.create_accumulator()?.evaluate())
1081            .collect::<DfResult<Vec<_>>>()?;
1082        for SeriesState {
1083            row,
1084            align_ts_accumulator,
1085        } in self.series_map.values_mut()
1086        {
1087            // skip empty time series
1088            if align_ts_accumulator.is_empty() {
1089                continue;
1090            }
1091            // find the first and last align_ts
1092            let begin_ts = *align_ts_accumulator.first_key_value().unwrap().0;
1093            let end_ts = *align_ts_accumulator.last_key_value().unwrap().0;
1094            let align_ts = if need_fill_output {
1095                // we need to fill empty align_ts which not data in that solt
1096                (begin_ts..=end_ts).step_by(self.align as usize).collect()
1097            } else {
1098                align_ts_accumulator.keys().copied().collect::<Vec<_>>()
1099            };
1100            for ts in &align_ts {
1101                if let Some(slot) = align_ts_accumulator.get_mut(ts) {
1102                    for (column, acc) in all_scalar.iter_mut().zip(slot.iter_mut()) {
1103                        column.push(acc.evaluate()?);
1104                    }
1105                } else {
1106                    // fill null in empty time solt
1107                    for (column, padding) in all_scalar.iter_mut().zip(padding_values.iter()) {
1108                        column.push(padding.clone())
1109                    }
1110                }
1111            }
1112            ts_builder.append_slice(&align_ts);
1113            // apply fill strategy on time series
1114            for (
1115                i,
1116                RangeFnExec {
1117                    fill, need_cast, ..
1118                },
1119            ) in self.range_exec.iter().enumerate()
1120            {
1121                let time_series_data =
1122                    &mut all_scalar[i][start_index..start_index + align_ts.len()];
1123                if let Some(data_type) = need_cast {
1124                    cast_scalar_values(time_series_data, data_type)?;
1125                }
1126                if let Some(fill) = fill {
1127                    fill.apply_fill_strategy(&align_ts, time_series_data)?;
1128                }
1129            }
1130            by_rows.resize(by_rows.len() + align_ts.len(), row.row());
1131            start_index += align_ts.len();
1132        }
1133        for column_scalar in all_scalar {
1134            columns.push(ScalarValue::iter_to_array(column_scalar)?);
1135        }
1136        let ts_column = ts_builder.finish();
1137        // output schema before project follow the order of range expr | time index | by columns
1138        let ts_column = compute::cast(
1139            &ts_column,
1140            self.schema_before_project.field(columns.len()).data_type(),
1141        )?;
1142        columns.push(ts_column);
1143        columns.extend(self.row_converter.convert_rows(by_rows)?);
1144        let output = RecordBatch::try_new(self.schema_before_project.clone(), columns)?;
1145        let project_output = if let Some(project) = &self.schema_project {
1146            output.project(project)?
1147        } else {
1148            output
1149        };
1150        Ok(project_output)
1151    }
1152}
1153
1154enum ExecutionState {
1155    ReadingInput,
1156    ProducingOutput,
1157    Done,
1158}
1159
1160impl RecordBatchStream for RangeSelectStream {
1161    fn schema(&self) -> SchemaRef {
1162        self.schema.clone()
1163    }
1164}
1165
1166impl Stream for RangeSelectStream {
1167    type Item = DataFusionResult<RecordBatch>;
1168
1169    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1170        loop {
1171            match self.exec_state {
1172                ExecutionState::ReadingInput => {
1173                    match ready!(self.input.poll_next_unpin(cx)) {
1174                        // new batch to aggregate
1175                        Some(Ok(batch)) => {
1176                            if let Err(e) = self.update_range_context(batch) {
1177                                common_telemetry::debug!(
1178                                    "RangeSelectStream cannot update range context, schema: {:?}, err: {:?}", self.schema, e
1179                                );
1180                                return Poll::Ready(Some(Err(e)));
1181                            }
1182                        }
1183                        // inner had error, return to caller
1184                        Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1185                        // inner is done, producing output
1186                        None => {
1187                            self.exec_state = ExecutionState::ProducingOutput;
1188                        }
1189                    }
1190                }
1191                ExecutionState::ProducingOutput => {
1192                    let result = self.generate_output();
1193                    return match result {
1194                        // made output
1195                        Ok(batch) => {
1196                            self.exec_state = ExecutionState::Done;
1197                            Poll::Ready(Some(Ok(batch)))
1198                        }
1199                        // error making output
1200                        Err(error) => Poll::Ready(Some(Err(error))),
1201                    };
1202                }
1203                ExecutionState::Done => return Poll::Ready(None),
1204            }
1205        }
1206    }
1207}
1208
1209#[cfg(test)]
1210mod test {
1211    macro_rules! nullable_array {
1212        ($builder:ident,) => {
1213        };
1214        ($array_type:ident ; $($tail:tt)*) => {
1215            paste::item! {
1216                {
1217                    let mut builder = arrow::array::[<$array_type Builder>]::new();
1218                    nullable_array!(builder, $($tail)*);
1219                    builder.finish()
1220                }
1221            }
1222        };
1223        ($builder:ident, null) => {
1224            $builder.append_null();
1225        };
1226        ($builder:ident, null, $($tail:tt)*) => {
1227            $builder.append_null();
1228            nullable_array!($builder, $($tail)*);
1229        };
1230        ($builder:ident, $value:literal) => {
1231            $builder.append_value($value);
1232        };
1233        ($builder:ident, $value:literal, $($tail:tt)*) => {
1234            $builder.append_value($value);
1235            nullable_array!($builder, $($tail)*);
1236        };
1237    }
1238
1239    use std::sync::Arc;
1240
1241    use arrow_schema::SortOptions;
1242    use datafusion::arrow::datatypes::{
1243        ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
1244    };
1245    use datafusion::datasource::memory::MemorySourceConfig;
1246    use datafusion::datasource::source::DataSourceExec;
1247    use datafusion::functions_aggregate::min_max;
1248    use datafusion::physical_plan::sorts::sort::SortExec;
1249    use datafusion::prelude::SessionContext;
1250    use datafusion_physical_expr::expressions::Column;
1251    use datafusion_physical_expr::PhysicalSortExpr;
1252    use datatypes::arrow::array::TimestampMillisecondArray;
1253    use datatypes::arrow_array::StringArray;
1254
1255    use super::*;
1256
1257    const TIME_INDEX_COLUMN: &str = "timestamp";
1258
1259    fn prepare_test_data(is_float: bool, is_gap: bool) -> DataSourceExec {
1260        let schema = Arc::new(Schema::new(vec![
1261            Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
1262            Field::new(
1263                "value",
1264                if is_float {
1265                    DataType::Float64
1266                } else {
1267                    DataType::Int64
1268                },
1269                true,
1270            ),
1271            Field::new("host", DataType::Utf8, true),
1272        ]));
1273        let timestamp_column: Arc<dyn Array> = if !is_gap {
1274            Arc::new(TimestampMillisecondArray::from(vec![
1275                0, 5_000, 10_000, 15_000, 20_000, // host 1 every 5s
1276                0, 5_000, 10_000, 15_000, 20_000, // host 2 every 5s
1277            ])) as _
1278        } else {
1279            Arc::new(TimestampMillisecondArray::from(vec![
1280                0, 15_000, // host 1 every 5s, missing data on 5_000, 10_000
1281                0, 15_000, // host 2 every 5s, missing data on 5_000, 10_000
1282            ])) as _
1283        };
1284        let mut host = vec!["host1"; timestamp_column.len() / 2];
1285        host.extend(vec!["host2"; timestamp_column.len() / 2]);
1286        let mut value_column: Arc<dyn Array> = if is_gap {
1287            Arc::new(nullable_array!(Int64;
1288                0, 6, // data for host 1
1289                6, 12 // data for host 2
1290            )) as _
1291        } else {
1292            Arc::new(nullable_array!(Int64;
1293                0, null, 1, null, 2, // data for host 1
1294                3, null, 4, null, 5 // data for host 2
1295            )) as _
1296        };
1297        if is_float {
1298            value_column =
1299                cast_with_options(&value_column, &DataType::Float64, &CastOptions::default())
1300                    .unwrap();
1301        }
1302        let host_column: Arc<dyn Array> = Arc::new(StringArray::from(host)) as _;
1303        let data = RecordBatch::try_new(
1304            schema.clone(),
1305            vec![timestamp_column, value_column, host_column],
1306        )
1307        .unwrap();
1308
1309        DataSourceExec::new(Arc::new(
1310            MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(),
1311        ))
1312    }
1313
1314    async fn do_range_select_test(
1315        range1: Millisecond,
1316        range2: Millisecond,
1317        align: Millisecond,
1318        fill: Option<Fill>,
1319        is_float: bool,
1320        is_gap: bool,
1321        expected: String,
1322    ) {
1323        let data_type = if is_float {
1324            DataType::Float64
1325        } else {
1326            DataType::Int64
1327        };
1328        let (need_cast, schema_data_type) = if !is_float && matches!(fill, Some(Fill::Linear)) {
1329            // data_type = DataType::Float64;
1330            (Some(DataType::Float64), DataType::Float64)
1331        } else {
1332            (None, data_type.clone())
1333        };
1334        let memory_exec = Arc::new(prepare_test_data(is_float, is_gap));
1335        let schema = Arc::new(Schema::new(vec![
1336            Field::new("MIN(value)", schema_data_type.clone(), true),
1337            Field::new("MAX(value)", schema_data_type, true),
1338            Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
1339            Field::new("host", DataType::Utf8, true),
1340        ]));
1341        let cache = PlanProperties::new(
1342            EquivalenceProperties::new(schema.clone()),
1343            Partitioning::UnknownPartitioning(1),
1344            EmissionType::Incremental,
1345            Boundedness::Bounded,
1346        );
1347        let input_schema = memory_exec.schema().clone();
1348        let range_select_exec = Arc::new(RangeSelectExec {
1349            input: memory_exec,
1350            range_exec: vec![
1351                RangeFnExec {
1352                    expr: Arc::new(
1353                        AggregateExprBuilder::new(
1354                            min_max::min_udaf(),
1355                            vec![Arc::new(Column::new("value", 1))],
1356                        )
1357                        .schema(input_schema.clone())
1358                        .alias("MIN(value)")
1359                        .build()
1360                        .unwrap(),
1361                    ),
1362                    range: range1,
1363                    fill: fill.clone(),
1364                    need_cast: need_cast.clone(),
1365                },
1366                RangeFnExec {
1367                    expr: Arc::new(
1368                        AggregateExprBuilder::new(
1369                            min_max::max_udaf(),
1370                            vec![Arc::new(Column::new("value", 1))],
1371                        )
1372                        .schema(input_schema.clone())
1373                        .alias("MAX(value)")
1374                        .build()
1375                        .unwrap(),
1376                    ),
1377                    range: range2,
1378                    fill,
1379                    need_cast,
1380                },
1381            ],
1382            align,
1383            align_to: 0,
1384            by: vec![Arc::new(Column::new("host", 2))],
1385            time_index: TIME_INDEX_COLUMN.to_string(),
1386            schema: schema.clone(),
1387            schema_before_project: schema.clone(),
1388            schema_project: None,
1389            by_schema: Arc::new(Schema::new(vec![Field::new("host", DataType::Utf8, true)])),
1390            metric: ExecutionPlanMetricsSet::new(),
1391            cache,
1392        });
1393        let sort_exec = SortExec::new(
1394            [
1395                PhysicalSortExpr {
1396                    expr: Arc::new(Column::new("host", 3)),
1397                    options: SortOptions {
1398                        descending: false,
1399                        nulls_first: true,
1400                    },
1401                },
1402                PhysicalSortExpr {
1403                    expr: Arc::new(Column::new(TIME_INDEX_COLUMN, 2)),
1404                    options: SortOptions {
1405                        descending: false,
1406                        nulls_first: true,
1407                    },
1408                },
1409            ]
1410            .into(),
1411            range_select_exec,
1412        );
1413        let session_context = SessionContext::default();
1414        let result =
1415            datafusion::physical_plan::collect(Arc::new(sort_exec), session_context.task_ctx())
1416                .await
1417                .unwrap();
1418
1419        let result_literal = arrow::util::pretty::pretty_format_batches(&result)
1420            .unwrap()
1421            .to_string();
1422
1423        assert_eq!(result_literal, expected);
1424    }
1425
1426    #[tokio::test]
1427    async fn range_10s_align_1000s() {
1428        let expected = String::from(
1429            "+------------+------------+---------------------+-------+\
1430            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1431            \n+------------+------------+---------------------+-------+\
1432            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1433            \n| 3.0        | 3.0        | 1970-01-01T00:00:00 | host2 |\
1434            \n+------------+------------+---------------------+-------+",
1435        );
1436        do_range_select_test(
1437            10_000,
1438            10_000,
1439            1_000_000,
1440            Some(Fill::Null),
1441            true,
1442            false,
1443            expected,
1444        )
1445        .await;
1446    }
1447
1448    #[tokio::test]
1449    async fn range_fill_null() {
1450        let expected = String::from(
1451            "+------------+------------+---------------------+-------+\
1452            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1453            \n+------------+------------+---------------------+-------+\
1454            \n| 0.0        |            | 1969-12-31T23:59:55 | host1 |\
1455            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1456            \n| 1.0        |            | 1970-01-01T00:00:05 | host1 |\
1457            \n| 1.0        | 1.0        | 1970-01-01T00:00:10 | host1 |\
1458            \n| 2.0        |            | 1970-01-01T00:00:15 | host1 |\
1459            \n| 2.0        | 2.0        | 1970-01-01T00:00:20 | host1 |\
1460            \n| 3.0        |            | 1969-12-31T23:59:55 | host2 |\
1461            \n| 3.0        | 3.0        | 1970-01-01T00:00:00 | host2 |\
1462            \n| 4.0        |            | 1970-01-01T00:00:05 | host2 |\
1463            \n| 4.0        | 4.0        | 1970-01-01T00:00:10 | host2 |\
1464            \n| 5.0        |            | 1970-01-01T00:00:15 | host2 |\
1465            \n| 5.0        | 5.0        | 1970-01-01T00:00:20 | host2 |\
1466            \n+------------+------------+---------------------+-------+",
1467        );
1468        do_range_select_test(
1469            10_000,
1470            5_000,
1471            5_000,
1472            Some(Fill::Null),
1473            true,
1474            false,
1475            expected,
1476        )
1477        .await;
1478    }
1479
1480    #[tokio::test]
1481    async fn range_fill_prev() {
1482        let expected = String::from(
1483            "+------------+------------+---------------------+-------+\
1484            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1485            \n+------------+------------+---------------------+-------+\
1486            \n| 0.0        |            | 1969-12-31T23:59:55 | host1 |\
1487            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1488            \n| 1.0        | 0.0        | 1970-01-01T00:00:05 | host1 |\
1489            \n| 1.0        | 1.0        | 1970-01-01T00:00:10 | host1 |\
1490            \n| 2.0        | 1.0        | 1970-01-01T00:00:15 | host1 |\
1491            \n| 2.0        | 2.0        | 1970-01-01T00:00:20 | host1 |\
1492            \n| 3.0        |            | 1969-12-31T23:59:55 | host2 |\
1493            \n| 3.0        | 3.0        | 1970-01-01T00:00:00 | host2 |\
1494            \n| 4.0        | 3.0        | 1970-01-01T00:00:05 | host2 |\
1495            \n| 4.0        | 4.0        | 1970-01-01T00:00:10 | host2 |\
1496            \n| 5.0        | 4.0        | 1970-01-01T00:00:15 | host2 |\
1497            \n| 5.0        | 5.0        | 1970-01-01T00:00:20 | host2 |\
1498            \n+------------+------------+---------------------+-------+",
1499        );
1500        do_range_select_test(
1501            10_000,
1502            5_000,
1503            5_000,
1504            Some(Fill::Prev),
1505            true,
1506            false,
1507            expected,
1508        )
1509        .await;
1510    }
1511
1512    #[tokio::test]
1513    async fn range_fill_linear() {
1514        let expected = String::from(
1515            "+------------+------------+---------------------+-------+\
1516            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1517            \n+------------+------------+---------------------+-------+\
1518            \n| 0.0        | -0.5       | 1969-12-31T23:59:55 | host1 |\
1519            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1520            \n| 1.0        | 0.5        | 1970-01-01T00:00:05 | host1 |\
1521            \n| 1.0        | 1.0        | 1970-01-01T00:00:10 | host1 |\
1522            \n| 2.0        | 1.5        | 1970-01-01T00:00:15 | host1 |\
1523            \n| 2.0        | 2.0        | 1970-01-01T00:00:20 | host1 |\
1524            \n| 3.0        | 2.5        | 1969-12-31T23:59:55 | host2 |\
1525            \n| 3.0        | 3.0        | 1970-01-01T00:00:00 | host2 |\
1526            \n| 4.0        | 3.5        | 1970-01-01T00:00:05 | host2 |\
1527            \n| 4.0        | 4.0        | 1970-01-01T00:00:10 | host2 |\
1528            \n| 5.0        | 4.5        | 1970-01-01T00:00:15 | host2 |\
1529            \n| 5.0        | 5.0        | 1970-01-01T00:00:20 | host2 |\
1530            \n+------------+------------+---------------------+-------+",
1531        );
1532        do_range_select_test(
1533            10_000,
1534            5_000,
1535            5_000,
1536            Some(Fill::Linear),
1537            true,
1538            false,
1539            expected,
1540        )
1541        .await;
1542    }
1543
1544    #[tokio::test]
1545    async fn range_fill_integer_linear() {
1546        let expected = String::from(
1547            "+------------+------------+---------------------+-------+\
1548            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1549            \n+------------+------------+---------------------+-------+\
1550            \n| 0.0        | -0.5       | 1969-12-31T23:59:55 | host1 |\
1551            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1552            \n| 1.0        | 0.5        | 1970-01-01T00:00:05 | host1 |\
1553            \n| 1.0        | 1.0        | 1970-01-01T00:00:10 | host1 |\
1554            \n| 2.0        | 1.5        | 1970-01-01T00:00:15 | host1 |\
1555            \n| 2.0        | 2.0        | 1970-01-01T00:00:20 | host1 |\
1556            \n| 3.0        | 2.5        | 1969-12-31T23:59:55 | host2 |\
1557            \n| 3.0        | 3.0        | 1970-01-01T00:00:00 | host2 |\
1558            \n| 4.0        | 3.5        | 1970-01-01T00:00:05 | host2 |\
1559            \n| 4.0        | 4.0        | 1970-01-01T00:00:10 | host2 |\
1560            \n| 5.0        | 4.5        | 1970-01-01T00:00:15 | host2 |\
1561            \n| 5.0        | 5.0        | 1970-01-01T00:00:20 | host2 |\
1562            \n+------------+------------+---------------------+-------+",
1563        );
1564        do_range_select_test(
1565            10_000,
1566            5_000,
1567            5_000,
1568            Some(Fill::Linear),
1569            false,
1570            false,
1571            expected,
1572        )
1573        .await;
1574    }
1575
1576    #[tokio::test]
1577    async fn range_fill_const() {
1578        let expected = String::from(
1579            "+------------+------------+---------------------+-------+\
1580            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1581            \n+------------+------------+---------------------+-------+\
1582            \n| 0.0        | 6.6        | 1969-12-31T23:59:55 | host1 |\
1583            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1584            \n| 1.0        | 6.6        | 1970-01-01T00:00:05 | host1 |\
1585            \n| 1.0        | 1.0        | 1970-01-01T00:00:10 | host1 |\
1586            \n| 2.0        | 6.6        | 1970-01-01T00:00:15 | host1 |\
1587            \n| 2.0        | 2.0        | 1970-01-01T00:00:20 | host1 |\
1588            \n| 3.0        | 6.6        | 1969-12-31T23:59:55 | host2 |\
1589            \n| 3.0        | 3.0        | 1970-01-01T00:00:00 | host2 |\
1590            \n| 4.0        | 6.6        | 1970-01-01T00:00:05 | host2 |\
1591            \n| 4.0        | 4.0        | 1970-01-01T00:00:10 | host2 |\
1592            \n| 5.0        | 6.6        | 1970-01-01T00:00:15 | host2 |\
1593            \n| 5.0        | 5.0        | 1970-01-01T00:00:20 | host2 |\
1594            \n+------------+------------+---------------------+-------+",
1595        );
1596        do_range_select_test(
1597            10_000,
1598            5_000,
1599            5_000,
1600            Some(Fill::Const(ScalarValue::Float64(Some(6.6)))),
1601            true,
1602            false,
1603            expected,
1604        )
1605        .await;
1606    }
1607
1608    #[tokio::test]
1609    async fn range_fill_gap() {
1610        let expected = String::from(
1611            "+------------+------------+---------------------+-------+\
1612            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1613            \n+------------+------------+---------------------+-------+\
1614            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1615            \n| 6.0        | 6.0        | 1970-01-01T00:00:15 | host1 |\
1616            \n| 6.0        | 6.0        | 1970-01-01T00:00:00 | host2 |\
1617            \n| 12.0       | 12.0       | 1970-01-01T00:00:15 | host2 |\
1618            \n+------------+------------+---------------------+-------+",
1619        );
1620        do_range_select_test(5_000, 5_000, 5_000, None, true, true, expected).await;
1621        let expected = String::from(
1622            "+------------+------------+---------------------+-------+\
1623            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1624            \n+------------+------------+---------------------+-------+\
1625            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1626            \n|            |            | 1970-01-01T00:00:05 | host1 |\
1627            \n|            |            | 1970-01-01T00:00:10 | host1 |\
1628            \n| 6.0        | 6.0        | 1970-01-01T00:00:15 | host1 |\
1629            \n| 6.0        | 6.0        | 1970-01-01T00:00:00 | host2 |\
1630            \n|            |            | 1970-01-01T00:00:05 | host2 |\
1631            \n|            |            | 1970-01-01T00:00:10 | host2 |\
1632            \n| 12.0       | 12.0       | 1970-01-01T00:00:15 | host2 |\
1633            \n+------------+------------+---------------------+-------+",
1634        );
1635        do_range_select_test(5_000, 5_000, 5_000, Some(Fill::Null), true, true, expected).await;
1636        let expected = String::from(
1637            "+------------+------------+---------------------+-------+\
1638            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1639            \n+------------+------------+---------------------+-------+\
1640            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1641            \n| 0.0        | 0.0        | 1970-01-01T00:00:05 | host1 |\
1642            \n| 0.0        | 0.0        | 1970-01-01T00:00:10 | host1 |\
1643            \n| 6.0        | 6.0        | 1970-01-01T00:00:15 | host1 |\
1644            \n| 6.0        | 6.0        | 1970-01-01T00:00:00 | host2 |\
1645            \n| 6.0        | 6.0        | 1970-01-01T00:00:05 | host2 |\
1646            \n| 6.0        | 6.0        | 1970-01-01T00:00:10 | host2 |\
1647            \n| 12.0       | 12.0       | 1970-01-01T00:00:15 | host2 |\
1648            \n+------------+------------+---------------------+-------+",
1649        );
1650        do_range_select_test(5_000, 5_000, 5_000, Some(Fill::Prev), true, true, expected).await;
1651        let expected = String::from(
1652            "+------------+------------+---------------------+-------+\
1653            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1654            \n+------------+------------+---------------------+-------+\
1655            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1656            \n| 2.0        | 2.0        | 1970-01-01T00:00:05 | host1 |\
1657            \n| 4.0        | 4.0        | 1970-01-01T00:00:10 | host1 |\
1658            \n| 6.0        | 6.0        | 1970-01-01T00:00:15 | host1 |\
1659            \n| 6.0        | 6.0        | 1970-01-01T00:00:00 | host2 |\
1660            \n| 8.0        | 8.0        | 1970-01-01T00:00:05 | host2 |\
1661            \n| 10.0       | 10.0       | 1970-01-01T00:00:10 | host2 |\
1662            \n| 12.0       | 12.0       | 1970-01-01T00:00:15 | host2 |\
1663            \n+------------+------------+---------------------+-------+",
1664        );
1665        do_range_select_test(
1666            5_000,
1667            5_000,
1668            5_000,
1669            Some(Fill::Linear),
1670            true,
1671            true,
1672            expected,
1673        )
1674        .await;
1675        let expected = String::from(
1676            "+------------+------------+---------------------+-------+\
1677            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1678            \n+------------+------------+---------------------+-------+\
1679            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1680            \n| 6.0        | 6.0        | 1970-01-01T00:00:05 | host1 |\
1681            \n| 6.0        | 6.0        | 1970-01-01T00:00:10 | host1 |\
1682            \n| 6.0        | 6.0        | 1970-01-01T00:00:15 | host1 |\
1683            \n| 6.0        | 6.0        | 1970-01-01T00:00:00 | host2 |\
1684            \n| 6.0        | 6.0        | 1970-01-01T00:00:05 | host2 |\
1685            \n| 6.0        | 6.0        | 1970-01-01T00:00:10 | host2 |\
1686            \n| 12.0       | 12.0       | 1970-01-01T00:00:15 | host2 |\
1687            \n+------------+------------+---------------------+-------+",
1688        );
1689        do_range_select_test(
1690            5_000,
1691            5_000,
1692            5_000,
1693            Some(Fill::Const(ScalarValue::Float64(Some(6.0)))),
1694            true,
1695            true,
1696            expected,
1697        )
1698        .await;
1699    }
1700
1701    #[test]
1702    fn fill_test() {
1703        assert!(Fill::try_from_str("", &DataType::UInt8).unwrap().is_none());
1704        assert!(Fill::try_from_str("Linear", &DataType::UInt8).unwrap() == Some(Fill::Linear));
1705        assert_eq!(
1706            Fill::try_from_str("Linear", &DataType::Boolean)
1707                .unwrap_err()
1708                .to_string(),
1709            "Error during planning: Use FILL LINEAR on Non-numeric DataType Boolean"
1710        );
1711        assert_eq!(
1712            Fill::try_from_str("WHAT", &DataType::UInt8)
1713                .unwrap_err()
1714                .to_string(),
1715                "Error during planning: WHAT is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string 'WHAT' to value of UInt8 type }"
1716        );
1717        assert_eq!(
1718            Fill::try_from_str("8.0", &DataType::UInt8)
1719                .unwrap_err()
1720                .to_string(),
1721                "Error during planning: 8.0 is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string '8.0' to value of UInt8 type }"
1722        );
1723        assert!(
1724            Fill::try_from_str("8", &DataType::UInt8).unwrap()
1725                == Some(Fill::Const(ScalarValue::UInt8(Some(8))))
1726        );
1727        let mut test1 = vec![
1728            ScalarValue::UInt8(Some(8)),
1729            ScalarValue::UInt8(None),
1730            ScalarValue::UInt8(Some(9)),
1731        ];
1732        Fill::Null.apply_fill_strategy(&[], &mut test1).unwrap();
1733        assert_eq!(test1[1], ScalarValue::UInt8(None));
1734        Fill::Prev.apply_fill_strategy(&[], &mut test1).unwrap();
1735        assert_eq!(test1[1], ScalarValue::UInt8(Some(8)));
1736        test1[1] = ScalarValue::UInt8(None);
1737        Fill::Const(ScalarValue::UInt8(Some(10)))
1738            .apply_fill_strategy(&[], &mut test1)
1739            .unwrap();
1740        assert_eq!(test1[1], ScalarValue::UInt8(Some(10)));
1741    }
1742
1743    #[test]
1744    fn test_fill_linear() {
1745        let ts = vec![1, 2, 3, 4, 5];
1746        let mut test = vec![
1747            ScalarValue::Float32(Some(1.0)),
1748            ScalarValue::Float32(None),
1749            ScalarValue::Float32(Some(3.0)),
1750            ScalarValue::Float32(None),
1751            ScalarValue::Float32(Some(5.0)),
1752        ];
1753        Fill::Linear.apply_fill_strategy(&ts, &mut test).unwrap();
1754        let mut test1 = vec![
1755            ScalarValue::Float32(None),
1756            ScalarValue::Float32(Some(2.0)),
1757            ScalarValue::Float32(None),
1758            ScalarValue::Float32(Some(4.0)),
1759            ScalarValue::Float32(None),
1760        ];
1761        Fill::Linear.apply_fill_strategy(&ts, &mut test1).unwrap();
1762        assert_eq!(test, test1);
1763        // test linear interpolation on irregularly spaced ts/data
1764        let ts = vec![
1765            1,   // None
1766            3,   // 1.0
1767            8,   // 11.0
1768            30,  // None
1769            88,  // 10.0
1770            108, // 5.0
1771            128, // None
1772        ];
1773        let mut test = vec![
1774            ScalarValue::Float64(None),
1775            ScalarValue::Float64(Some(1.0)),
1776            ScalarValue::Float64(Some(11.0)),
1777            ScalarValue::Float64(None),
1778            ScalarValue::Float64(Some(10.0)),
1779            ScalarValue::Float64(Some(5.0)),
1780            ScalarValue::Float64(None),
1781        ];
1782        Fill::Linear.apply_fill_strategy(&ts, &mut test).unwrap();
1783        let data: Vec<_> = test
1784            .into_iter()
1785            .map(|x| {
1786                let ScalarValue::Float64(Some(f)) = x else {
1787                    unreachable!()
1788                };
1789                f
1790            })
1791            .collect();
1792        assert_eq!(data, vec![-3.0, 1.0, 11.0, 10.725, 10.0, 5.0, 0.0]);
1793        // test corner case
1794        let ts = vec![1];
1795        let test = vec![ScalarValue::Float32(None)];
1796        let mut test1 = test.clone();
1797        Fill::Linear.apply_fill_strategy(&ts, &mut test1).unwrap();
1798        assert_eq!(test, test1);
1799    }
1800}