Skip to main content

query/range_select/
plan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::btree_map::Entry;
17use std::collections::{BTreeMap, HashMap};
18use std::fmt::Display;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use ahash::RandomState;
25use arrow::compute::{self, CastOptions, cast_with_options, take_arrays};
26use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions, TimeUnit};
27use common_recordbatch::DfSendableRecordBatchStream;
28use datafusion::common::Result as DataFusionResult;
29use datafusion::error::Result as DfResult;
30use datafusion::execution::TaskContext;
31use datafusion::execution::context::SessionState;
32use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
33use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
34use datafusion::physical_plan::{
35    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream,
36    SendableRecordBatchStream,
37};
38use datafusion_common::hash_utils::create_hashes;
39use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, ScalarValue};
40use datafusion_expr::utils::{COUNT_STAR_EXPANSION, exprlist_to_fields};
41use datafusion_expr::{
42    Accumulator, Expr, ExprSchemable, LogicalPlan, UserDefinedLogicalNodeCore, lit,
43};
44use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
45use datafusion_physical_expr::{
46    Distribution, EquivalenceProperties, Partitioning, PhysicalExpr, PhysicalSortExpr,
47    create_physical_expr, create_physical_sort_expr,
48};
49use datatypes::arrow::array::{
50    Array, ArrayRef, TimestampMillisecondArray, TimestampMillisecondBuilder, UInt32Builder,
51};
52use datatypes::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType};
53use datatypes::arrow::record_batch::RecordBatch;
54use datatypes::arrow::row::{OwnedRow, RowConverter, SortField};
55use futures::{Stream, ready};
56use futures_util::StreamExt;
57use snafu::ensure;
58
59use crate::error::{RangeQuerySnafu, Result};
60
61type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native;
62
63#[derive(PartialEq, Eq, Debug, Hash, Clone)]
64pub enum Fill {
65    Null,
66    Prev,
67    Linear,
68    Const(ScalarValue),
69}
70
71impl Display for Fill {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        match self {
74            Fill::Null => write!(f, "NULL"),
75            Fill::Prev => write!(f, "PREV"),
76            Fill::Linear => write!(f, "LINEAR"),
77            Fill::Const(x) => write!(f, "{}", x),
78        }
79    }
80}
81
82impl Fill {
83    pub fn try_from_str(value: &str, datatype: &DataType) -> DfResult<Option<Self>> {
84        let s = value.to_uppercase();
85        match s.as_str() {
86            "" => Ok(None),
87            "NULL" => Ok(Some(Self::Null)),
88            "PREV" => Ok(Some(Self::Prev)),
89            "LINEAR" => {
90                if datatype.is_numeric() {
91                    Ok(Some(Self::Linear))
92                } else {
93                    Err(DataFusionError::Plan(format!(
94                        "Use FILL LINEAR on Non-numeric DataType {}",
95                        datatype
96                    )))
97                }
98            }
99            _ => ScalarValue::try_from_string(s.clone(), datatype)
100                .map_err(|err| {
101                    DataFusionError::Plan(format!(
102                        "{} is not a valid fill option, fail to convert to a const value. {{ {} }}",
103                        s, err
104                    ))
105                })
106                .map(|x| Some(Fill::Const(x))),
107        }
108    }
109
110    /// The input `data` contains data on a complete time series.
111    /// If the filling strategy is `PREV` or `LINEAR`, caller must be ensured that the incoming `ts`&`data` is ascending time order.
112    pub fn apply_fill_strategy(&self, ts: &[i64], data: &mut [ScalarValue]) -> DfResult<()> {
113        // No calculation need in `Fill::Null`
114        if matches!(self, Fill::Null) {
115            return Ok(());
116        }
117        let len = data.len();
118        if *self == Fill::Linear {
119            return Self::fill_linear(ts, data);
120        }
121        for i in 0..len {
122            if data[i].is_null() {
123                match self {
124                    Fill::Prev => {
125                        if i != 0 {
126                            data[i] = data[i - 1].clone()
127                        }
128                    }
129                    // The calculation of linear interpolation is relatively complicated.
130                    // `Self::fill_linear` is used to dispose `Fill::Linear`.
131                    // No calculation need in `Fill::Null`
132                    Fill::Linear | Fill::Null => unreachable!(),
133                    Fill::Const(v) => data[i] = v.clone(),
134                }
135            }
136        }
137        Ok(())
138    }
139
140    fn fill_linear(ts: &[i64], data: &mut [ScalarValue]) -> DfResult<()> {
141        let not_null_num = data
142            .iter()
143            .fold(0, |acc, x| if x.is_null() { acc } else { acc + 1 });
144        // We need at least two non-empty data points to perform linear interpolation
145        if not_null_num < 2 {
146            return Ok(());
147        }
148        let mut index = 0;
149        let mut head: Option<usize> = None;
150        let mut tail: Option<usize> = None;
151        while index < data.len() {
152            // find null interval [start, end)
153            // start is null, end is not-null
154            let start = data[index..]
155                .iter()
156                .position(ScalarValue::is_null)
157                .unwrap_or(data.len() - index)
158                + index;
159            if start == data.len() {
160                break;
161            }
162            let end = data[start..]
163                .iter()
164                .position(|r| !r.is_null())
165                .unwrap_or(data.len() - start)
166                + start;
167            index = end + 1;
168            // head or tail null dispose later, record start/end first
169            if start == 0 {
170                head = Some(end);
171            } else if end == data.len() {
172                tail = Some(start);
173            } else {
174                linear_interpolation(ts, data, start - 1, end, start, end)?;
175            }
176        }
177        // dispose head null interval
178        if let Some(end) = head {
179            linear_interpolation(ts, data, end, end + 1, 0, end)?;
180        }
181        // dispose tail null interval
182        if let Some(start) = tail {
183            linear_interpolation(ts, data, start - 2, start - 1, start, data.len())?;
184        }
185        Ok(())
186    }
187}
188
189/// use `(ts[i1], data[i1])`, `(ts[i2], data[i2])` as endpoint, linearly interpolates element over the interval `[start, end)`
190fn linear_interpolation(
191    ts: &[i64],
192    data: &mut [ScalarValue],
193    i1: usize,
194    i2: usize,
195    start: usize,
196    end: usize,
197) -> DfResult<()> {
198    let (x0, x1) = (ts[i1] as f64, ts[i2] as f64);
199    let (y0, y1, is_float32) = match (&data[i1], &data[i2]) {
200        (ScalarValue::Float64(Some(y0)), ScalarValue::Float64(Some(y1))) => (*y0, *y1, false),
201        (ScalarValue::Float32(Some(y0)), ScalarValue::Float32(Some(y1))) => {
202            (*y0 as f64, *y1 as f64, true)
203        }
204        _ => {
205            return Err(DataFusionError::Execution(
206                "RangePlan: Apply Fill LINEAR strategy on Non-floating type".to_string(),
207            ));
208        }
209    };
210    // To avoid divide zero error, kind of defensive programming
211    if x1 == x0 {
212        return Err(DataFusionError::Execution(
213            "RangePlan: Linear interpolation using the same coordinate points".to_string(),
214        ));
215    }
216    for i in start..end {
217        let val = y0 + (y1 - y0) / (x1 - x0) * (ts[i] as f64 - x0);
218        data[i] = if is_float32 {
219            ScalarValue::Float32(Some(val as f32))
220        } else {
221            ScalarValue::Float64(Some(val))
222        }
223    }
224    Ok(())
225}
226
227#[derive(Eq, Clone, Debug)]
228pub struct RangeFn {
229    /// with format like `max(a) RANGE 300s [FILL NULL]`
230    pub name: String,
231    pub data_type: DataType,
232    pub expr: Expr,
233    pub range: Duration,
234    pub fill: Option<Fill>,
235    /// If the `FIll` strategy is `Linear` and the output is an integer,
236    /// it is possible to calculate a floating point number.
237    /// So for `FILL==LINEAR`, the entire data will be implicitly converted to Float type
238    /// If `need_cast==true`, `data_type` may not consist with type `expr` generated.
239    pub need_cast: bool,
240}
241
242impl PartialEq for RangeFn {
243    fn eq(&self, other: &Self) -> bool {
244        self.name == other.name
245    }
246}
247
248impl PartialOrd for RangeFn {
249    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
250        Some(self.cmp(other))
251    }
252}
253
254impl Ord for RangeFn {
255    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
256        self.name.cmp(&other.name)
257    }
258}
259
260impl std::hash::Hash for RangeFn {
261    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
262        self.name.hash(state);
263    }
264}
265
266impl Display for RangeFn {
267    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
268        write!(f, "{}", self.name)
269    }
270}
271
272#[derive(Debug, PartialEq, Eq, Hash)]
273pub struct RangeSelect {
274    /// The incoming logical plan
275    pub input: Arc<LogicalPlan>,
276    /// all range expressions
277    pub range_expr: Vec<RangeFn>,
278    pub align: Duration,
279    pub align_to: i64,
280    pub time_index: String,
281    pub time_expr: Expr,
282    pub by: Vec<Expr>,
283    pub schema: DFSchemaRef,
284    pub by_schema: DFSchemaRef,
285    /// If the `schema` of the `RangeSelect` happens to be the same as the content of the upper-level Projection Plan,
286    /// the final output needs to be `project` through `schema_project`,
287    /// so that we can omit the upper-level Projection Plan.
288    pub schema_project: Option<Vec<usize>>,
289    /// The schema before run projection, follow the order of `range expr | time index | by columns`
290    /// `schema_before_project  ----  schema_project ----> schema`
291    /// if `schema_project==None` then `schema_before_project==schema`
292    pub schema_before_project: DFSchemaRef,
293}
294
295impl PartialOrd for RangeSelect {
296    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
297        // Compare fields in order excluding `schema`, `by_schema`, `schema_before_project`.
298        match self.input.partial_cmp(&other.input) {
299            Some(Ordering::Equal) => {}
300            ord => return ord,
301        }
302        match self.range_expr.partial_cmp(&other.range_expr) {
303            Some(Ordering::Equal) => {}
304            ord => return ord,
305        }
306        match self.align.partial_cmp(&other.align) {
307            Some(Ordering::Equal) => {}
308            ord => return ord,
309        }
310        match self.align_to.partial_cmp(&other.align_to) {
311            Some(Ordering::Equal) => {}
312            ord => return ord,
313        }
314        match self.time_index.partial_cmp(&other.time_index) {
315            Some(Ordering::Equal) => {}
316            ord => return ord,
317        }
318        match self.time_expr.partial_cmp(&other.time_expr) {
319            Some(Ordering::Equal) => {}
320            ord => return ord,
321        }
322        match self.by.partial_cmp(&other.by) {
323            Some(Ordering::Equal) => {}
324            ord => return ord,
325        }
326        self.schema_project.partial_cmp(&other.schema_project)
327    }
328}
329
330impl RangeSelect {
331    pub fn try_new(
332        input: Arc<LogicalPlan>,
333        range_expr: Vec<RangeFn>,
334        align: Duration,
335        align_to: i64,
336        time_index: Expr,
337        by: Vec<Expr>,
338        projection_expr: &[Expr],
339    ) -> Result<Self> {
340        ensure!(
341            align.as_millis() != 0,
342            RangeQuerySnafu {
343                msg: "Can't use 0 as align in Range Query"
344            }
345        );
346        for expr in &range_expr {
347            ensure!(
348                expr.range.as_millis() != 0,
349                RangeQuerySnafu {
350                    msg: format!(
351                        "Invalid Range expr `{}`, Can't use 0 as range in Range Query",
352                        expr.name
353                    )
354                }
355            );
356        }
357        let mut fields = range_expr
358            .iter()
359            .map(
360                |RangeFn {
361                     name,
362                     data_type,
363                     fill,
364                     ..
365                 }| {
366                    let field = Field::new(
367                        name,
368                        data_type.clone(),
369                        // Only when data fill with Const option, the data can't be null
370                        !matches!(fill, Some(Fill::Const(..))),
371                    );
372                    Ok((None, Arc::new(field)))
373                },
374            )
375            .collect::<DfResult<Vec<_>>>()?;
376        // add align_ts
377        let ts_field = time_index.to_field(input.schema().as_ref())?;
378        let time_index_name = ts_field.1.name().clone();
379        fields.push(ts_field);
380        // add by
381        let by_fields = exprlist_to_fields(&by, &input)?;
382        fields.extend(by_fields.clone());
383        let schema_before_project = Arc::new(DFSchema::new_with_metadata(
384            fields,
385            input.schema().metadata().clone(),
386        )?);
387        let by_schema = Arc::new(DFSchema::new_with_metadata(
388            by_fields,
389            input.schema().metadata().clone(),
390        )?);
391        // If the results of project plan can be obtained directly from range plan without any additional
392        // calculations, no project plan is required. We can simply project the final output of the range
393        // plan to produce the final result.
394        let schema_project = projection_expr
395            .iter()
396            .map(|project_expr| {
397                if let Expr::Column(column) = project_expr {
398                    schema_before_project
399                        .index_of_column_by_name(column.relation.as_ref(), &column.name)
400                        .ok_or(())
401                } else {
402                    let (qualifier, field) = project_expr
403                        .to_field(input.schema().as_ref())
404                        .map_err(|_| ())?;
405                    schema_before_project
406                        .index_of_column_by_name(qualifier.as_ref(), field.name())
407                        .ok_or(())
408                }
409            })
410            .collect::<std::result::Result<Vec<usize>, ()>>()
411            .ok();
412        let schema = if let Some(project) = &schema_project {
413            let project_field = project
414                .iter()
415                .map(|i| {
416                    let f = schema_before_project.qualified_field(*i);
417                    (f.0.cloned(), f.1.clone())
418                })
419                .collect();
420            Arc::new(DFSchema::new_with_metadata(
421                project_field,
422                input.schema().metadata().clone(),
423            )?)
424        } else {
425            schema_before_project.clone()
426        };
427        Ok(Self {
428            input,
429            range_expr,
430            align,
431            align_to,
432            time_index: time_index_name,
433            time_expr: time_index,
434            schema,
435            by_schema,
436            by,
437            schema_project,
438            schema_before_project,
439        })
440    }
441}
442
443impl UserDefinedLogicalNodeCore for RangeSelect {
444    fn name(&self) -> &str {
445        "RangeSelect"
446    }
447
448    fn inputs(&self) -> Vec<&LogicalPlan> {
449        vec![&self.input]
450    }
451
452    fn schema(&self) -> &DFSchemaRef {
453        &self.schema
454    }
455
456    fn expressions(&self) -> Vec<Expr> {
457        self.range_expr
458            .iter()
459            .map(|expr| expr.expr.clone())
460            .chain([self.time_expr.clone()])
461            .chain(self.by.clone())
462            .collect()
463    }
464
465    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
466        write!(
467            f,
468            "RangeSelect: range_exprs=[{}], align={}ms, align_to={}ms, align_by=[{}], time_index={}",
469            self.range_expr
470                .iter()
471                .map(ToString::to_string)
472                .collect::<Vec<_>>()
473                .join(", "),
474            self.align.as_millis(),
475            self.align_to,
476            self.by
477                .iter()
478                .map(ToString::to_string)
479                .collect::<Vec<_>>()
480                .join(", "),
481            self.time_index
482        )
483    }
484
485    fn with_exprs_and_inputs(
486        &self,
487        exprs: Vec<Expr>,
488        inputs: Vec<LogicalPlan>,
489    ) -> DataFusionResult<Self> {
490        if inputs.is_empty() {
491            return Err(DataFusionError::Plan(
492                "RangeSelect: inputs is empty".to_string(),
493            ));
494        }
495        if exprs.len() != self.range_expr.len() + self.by.len() + 1 {
496            return Err(DataFusionError::Plan(
497                "RangeSelect: exprs length not match".to_string(),
498            ));
499        }
500
501        let range_expr = exprs
502            .iter()
503            .zip(self.range_expr.iter())
504            .map(|(e, range)| RangeFn {
505                name: range.name.clone(),
506                data_type: range.data_type.clone(),
507                expr: e.clone(),
508                range: range.range,
509                fill: range.fill.clone(),
510                need_cast: range.need_cast,
511            })
512            .collect();
513        let time_expr = exprs[self.range_expr.len()].clone();
514        let by = exprs[self.range_expr.len() + 1..].to_vec();
515        Ok(Self {
516            align: self.align,
517            align_to: self.align_to,
518            range_expr,
519            input: Arc::new(inputs[0].clone()),
520            time_index: self.time_index.clone(),
521            time_expr,
522            schema: self.schema.clone(),
523            by,
524            by_schema: self.by_schema.clone(),
525            schema_project: self.schema_project.clone(),
526            schema_before_project: self.schema_before_project.clone(),
527        })
528    }
529}
530
531impl RangeSelect {
532    fn create_physical_expr_list(
533        &self,
534        is_count_aggr: bool,
535        exprs: &[Expr],
536        df_schema: &Arc<DFSchema>,
537        session_state: &SessionState,
538    ) -> DfResult<Vec<Arc<dyn PhysicalExpr>>> {
539        exprs
540            .iter()
541            .map(|e| match e {
542                // `count(*)` will be rewritten by `CountWildcardRule` into `count(1)` when optimizing logical plan.
543                // The modification occurs after range plan rewrite.
544                // At this time, aggregate plan has been replaced by a custom range plan,
545                // so `CountWildcardRule` has not been applied.
546                // We manually modify it when creating the physical plan.
547                #[expect(deprecated)]
548                Expr::Wildcard { .. } if is_count_aggr => create_physical_expr(
549                    &lit(COUNT_STAR_EXPANSION),
550                    df_schema.as_ref(),
551                    session_state.execution_props(),
552                ),
553                _ => create_physical_expr(e, df_schema.as_ref(), session_state.execution_props()),
554            })
555            .collect::<DfResult<Vec<_>>>()
556    }
557
558    pub fn to_execution_plan(
559        &self,
560        logical_input: &LogicalPlan,
561        exec_input: Arc<dyn ExecutionPlan>,
562        session_state: &SessionState,
563    ) -> DfResult<Arc<dyn ExecutionPlan>> {
564        let fields: Vec<_> = self
565            .schema_before_project
566            .fields()
567            .iter()
568            .map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable()))
569            .collect();
570        let by_fields: Vec<_> = self
571            .by_schema
572            .fields()
573            .iter()
574            .map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable()))
575            .collect();
576        let input_dfschema = logical_input.schema();
577        let input_schema = exec_input.schema();
578        let range_exec: Vec<RangeFnExec> = self
579            .range_expr
580            .iter()
581            .map(|range_fn| {
582                let name = range_fn.expr.schema_name().to_string();
583                let range_expr = match &range_fn.expr {
584                    Expr::Alias(expr) => expr.expr.as_ref(),
585                    others => others,
586                };
587
588                let expr = match &range_expr {
589                    Expr::AggregateFunction(aggr)
590                        if (aggr.func.name() == "last_value"
591                            || aggr.func.name() == "first_value") =>
592                    {
593                        let order_by = if !aggr.params.order_by.is_empty() {
594                            aggr.params
595                                .order_by
596                                .iter()
597                                .map(|x| {
598                                    create_physical_sort_expr(
599                                        x,
600                                        input_dfschema.as_ref(),
601                                        session_state.execution_props(),
602                                    )
603                                })
604                                .collect::<DfResult<Vec<_>>>()?
605                        } else {
606                            // if user not assign order by, time index is needed as default ordering
607                            let time_index = create_physical_expr(
608                                &self.time_expr,
609                                input_dfschema.as_ref(),
610                                session_state.execution_props(),
611                            )?;
612                            vec![PhysicalSortExpr {
613                                expr: time_index,
614                                options: SortOptions {
615                                    descending: false,
616                                    nulls_first: false,
617                                },
618                            }]
619                        };
620                        let arg = self.create_physical_expr_list(
621                            false,
622                            &aggr.params.args,
623                            input_dfschema,
624                            session_state,
625                        )?;
626                        // first_value/last_value has only one param.
627                        // The param have been checked by datafusion in logical plan stage.
628                        // We can safely assume that there is only one element here.
629                        AggregateExprBuilder::new(aggr.func.clone(), arg)
630                            .schema(input_schema.clone())
631                            .order_by(order_by)
632                            .alias(name)
633                            .build()
634                    }
635                    Expr::AggregateFunction(aggr) => {
636                        let order_by = if !aggr.params.order_by.is_empty() {
637                            aggr.params
638                                .order_by
639                                .iter()
640                                .map(|x| {
641                                    create_physical_sort_expr(
642                                        x,
643                                        input_dfschema.as_ref(),
644                                        session_state.execution_props(),
645                                    )
646                                })
647                                .collect::<DfResult<Vec<_>>>()?
648                        } else {
649                            vec![]
650                        };
651                        let distinct = aggr.params.distinct;
652                        // TODO(discord9): add default null treatment?
653
654                        let input_phy_exprs = self.create_physical_expr_list(
655                            aggr.func.name() == "count",
656                            &aggr.params.args,
657                            input_dfschema,
658                            session_state,
659                        )?;
660                        AggregateExprBuilder::new(aggr.func.clone(), input_phy_exprs)
661                            .schema(input_schema.clone())
662                            .order_by(order_by)
663                            .with_distinct(distinct)
664                            .alias(name)
665                            .build()
666                    }
667                    _ => Err(DataFusionError::Plan(format!(
668                        "Unexpected Expr: {} in RangeSelect",
669                        range_fn.expr
670                    ))),
671                }?;
672                Ok(RangeFnExec {
673                    expr: Arc::new(expr),
674                    range: range_fn.range.as_millis() as Millisecond,
675                    fill: range_fn.fill.clone(),
676                    need_cast: if range_fn.need_cast {
677                        Some(range_fn.data_type.clone())
678                    } else {
679                        None
680                    },
681                })
682            })
683            .collect::<DfResult<Vec<_>>>()?;
684        let schema_before_project = Arc::new(Schema::new(fields));
685        let schema = if let Some(project) = &self.schema_project {
686            Arc::new(schema_before_project.project(project)?)
687        } else {
688            schema_before_project.clone()
689        };
690        let by = self.create_physical_expr_list(false, &self.by, input_dfschema, session_state)?;
691        let cache = Arc::new(PlanProperties::new(
692            EquivalenceProperties::new(schema.clone()),
693            Partitioning::UnknownPartitioning(1),
694            EmissionType::Incremental,
695            Boundedness::Bounded,
696        ));
697        Ok(Arc::new(RangeSelectExec {
698            input: exec_input,
699            range_exec,
700            align: self.align.as_millis() as Millisecond,
701            align_to: self.align_to,
702            by,
703            time_index: self.time_index.clone(),
704            schema,
705            by_schema: Arc::new(Schema::new(by_fields)),
706            metric: ExecutionPlanMetricsSet::new(),
707            schema_before_project,
708            schema_project: self.schema_project.clone(),
709            cache,
710        }))
711    }
712}
713
714/// Range function expression.
715#[derive(Debug, Clone)]
716struct RangeFnExec {
717    expr: Arc<AggregateFunctionExpr>,
718    range: Millisecond,
719    fill: Option<Fill>,
720    need_cast: Option<DataType>,
721}
722
723impl RangeFnExec {
724    /// Returns the expressions to pass to the aggregator.
725    /// It also adds the order by expressions to the list of expressions.
726    /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` requires this.
727    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
728        let mut exprs = self.expr.expressions();
729        exprs.extend(self.expr.order_bys().iter().map(|sort| sort.expr.clone()));
730        exprs
731    }
732}
733
734impl Display for RangeFnExec {
735    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
736        if let Some(fill) = &self.fill {
737            write!(
738                f,
739                "{} RANGE {}s FILL {}",
740                self.expr.name(),
741                self.range / 1000,
742                fill
743            )
744        } else {
745            write!(f, "{} RANGE {}s", self.expr.name(), self.range / 1000)
746        }
747    }
748}
749
750#[derive(Debug)]
751pub struct RangeSelectExec {
752    input: Arc<dyn ExecutionPlan>,
753    range_exec: Vec<RangeFnExec>,
754    align: Millisecond,
755    align_to: i64,
756    time_index: String,
757    by: Vec<Arc<dyn PhysicalExpr>>,
758    schema: SchemaRef,
759    by_schema: SchemaRef,
760    metric: ExecutionPlanMetricsSet,
761    schema_project: Option<Vec<usize>>,
762    schema_before_project: SchemaRef,
763    cache: Arc<PlanProperties>,
764}
765
766impl DisplayAs for RangeSelectExec {
767    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
768        match t {
769            DisplayFormatType::Default
770            | DisplayFormatType::Verbose
771            | DisplayFormatType::TreeRender => {
772                write!(f, "RangeSelectExec: ")?;
773                let range_expr_strs: Vec<String> =
774                    self.range_exec.iter().map(RangeFnExec::to_string).collect();
775                let by: Vec<String> = self.by.iter().map(|e| e.to_string()).collect();
776                write!(
777                    f,
778                    "range_expr=[{}], align={}ms, align_to={}ms, align_by=[{}], time_index={}",
779                    range_expr_strs.join(", "),
780                    self.align,
781                    self.align_to,
782                    by.join(", "),
783                    self.time_index,
784                )?;
785            }
786        }
787        Ok(())
788    }
789}
790
791impl ExecutionPlan for RangeSelectExec {
792    fn as_any(&self) -> &dyn std::any::Any {
793        self
794    }
795
796    fn schema(&self) -> SchemaRef {
797        self.schema.clone()
798    }
799
800    fn required_input_distribution(&self) -> Vec<Distribution> {
801        vec![Distribution::SinglePartition]
802    }
803
804    fn properties(&self) -> &Arc<PlanProperties> {
805        &self.cache
806    }
807
808    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
809        vec![&self.input]
810    }
811
812    fn with_new_children(
813        self: Arc<Self>,
814        children: Vec<Arc<dyn ExecutionPlan>>,
815    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
816        assert!(!children.is_empty());
817        Ok(Arc::new(Self {
818            input: children[0].clone(),
819            range_exec: self.range_exec.clone(),
820            time_index: self.time_index.clone(),
821            by: self.by.clone(),
822            align: self.align,
823            align_to: self.align_to,
824            schema: self.schema.clone(),
825            by_schema: self.by_schema.clone(),
826            metric: self.metric.clone(),
827            schema_before_project: self.schema_before_project.clone(),
828            schema_project: self.schema_project.clone(),
829            cache: self.cache.clone(),
830        }))
831    }
832
833    fn execute(
834        &self,
835        partition: usize,
836        context: Arc<TaskContext>,
837    ) -> DfResult<DfSendableRecordBatchStream> {
838        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
839        let batch_size = context.session_config().batch_size();
840        let input = self.input.execute(partition, context)?;
841        let schema = input.schema();
842        let time_index = schema
843            .column_with_name(&self.time_index)
844            .ok_or(DataFusionError::Execution(
845                "time index column not found".into(),
846            ))?
847            .0;
848        let row_converter = RowConverter::new(
849            self.by_schema
850                .fields()
851                .iter()
852                .map(|f| SortField::new(f.data_type().clone()))
853                .collect(),
854        )?;
855        Ok(Box::pin(RangeSelectStream {
856            batch_size,
857            schema: self.schema.clone(),
858            range_exec: self.range_exec.clone(),
859            input,
860            random_state: RandomState::new(),
861            time_index,
862            align: self.align,
863            align_to: self.align_to,
864            by: self.by.clone(),
865            series_map: HashMap::new(),
866            exec_state: ExecutionState::ReadingInput,
867            num_not_null_rows: 0,
868            row_converter,
869            modify_map: HashMap::new(),
870            metric: baseline_metric,
871            schema_project: self.schema_project.clone(),
872            schema_before_project: self.schema_before_project.clone(),
873            output_batch: None,
874            output_batch_offset: 0,
875        }))
876    }
877
878    fn metrics(&self) -> Option<MetricsSet> {
879        Some(self.metric.clone_inner())
880    }
881
882    fn name(&self) -> &str {
883        "RanegSelectExec"
884    }
885}
886
887struct RangeSelectStream {
888    batch_size: usize,
889    /// the schema of output column
890    schema: SchemaRef,
891    range_exec: Vec<RangeFnExec>,
892    input: SendableRecordBatchStream,
893    /// Column index of TIME INDEX column's position in the input schema
894    time_index: usize,
895    /// the unit of `align` is millisecond
896    align: Millisecond,
897    align_to: i64,
898    by: Vec<Arc<dyn PhysicalExpr>>,
899    exec_state: ExecutionState,
900    /// Converter for the by values
901    row_converter: RowConverter,
902    random_state: RandomState,
903    /// key: time series's hash value
904    /// value: time series's state on different align_ts
905    series_map: HashMap<u64, SeriesState>,
906    /// key: `(hash of by rows, align_ts)`
907    /// value: `[row_ids]`
908    /// It is used to record the data that needs to be aggregated in each time slot during the data update process
909    modify_map: HashMap<(u64, Millisecond), Vec<u32>>,
910    /// The number of rows of not null rows in the final output
911    num_not_null_rows: usize,
912    metric: BaselineMetrics,
913    schema_project: Option<Vec<usize>>,
914    schema_before_project: SchemaRef,
915    output_batch: Option<RecordBatch>,
916    output_batch_offset: usize,
917}
918
919#[derive(Debug)]
920struct SeriesState {
921    /// by values written by `RowWriter`
922    row: OwnedRow,
923    /// key: align_ts
924    /// value: a vector, each element is a range_fn follow the order of `range_exec`
925    align_ts_accumulator: BTreeMap<Millisecond, Vec<Box<dyn Accumulator>>>,
926}
927
928/// Use `align_to` as time origin.
929/// According to `align` as time interval, produces aligned time.
930/// Combining the parameters related to the range query,
931/// determine for each `Accumulator` `(hash, align_ts)` define,
932/// which rows of data will be applied to it.
933fn produce_align_time(
934    align_to: i64,
935    range: Millisecond,
936    align: Millisecond,
937    ts_column: &TimestampMillisecondArray,
938    by_columns_hash: &[u64],
939    modify_map: &mut HashMap<(u64, Millisecond), Vec<u32>>,
940) {
941    modify_map.clear();
942    // make modify_map for range_fn[i]
943    for (row, hash) in by_columns_hash.iter().enumerate() {
944        let ts = ts_column.value(row);
945        let ith_slot = (ts - align_to).div_floor(align);
946        let mut align_ts = ith_slot * align + align_to;
947        while align_ts <= ts && ts < align_ts + range {
948            modify_map
949                .entry((*hash, align_ts))
950                .or_default()
951                .push(row as u32);
952            align_ts -= align;
953        }
954    }
955}
956
957fn cast_scalar_values(values: &mut [ScalarValue], data_type: &DataType) -> DfResult<()> {
958    let array = ScalarValue::iter_to_array(values.to_vec())?;
959    let cast_array = cast_with_options(&array, data_type, &CastOptions::default())?;
960    for (i, value) in values.iter_mut().enumerate() {
961        *value = ScalarValue::try_from_array(&cast_array, i)?;
962    }
963    Ok(())
964}
965
966impl RangeSelectStream {
967    fn evaluate_many(
968        &self,
969        batch: &RecordBatch,
970        exprs: &[Arc<dyn PhysicalExpr>],
971    ) -> DfResult<Vec<ArrayRef>> {
972        exprs
973            .iter()
974            .map(|expr| {
975                let value = expr.evaluate(batch)?;
976                value.into_array(batch.num_rows())
977            })
978            .collect::<DfResult<Vec<_>>>()
979    }
980
981    fn update_range_context(&mut self, batch: RecordBatch) -> DfResult<()> {
982        let _timer = self.metric.elapsed_compute().timer();
983        let num_rows = batch.num_rows();
984        let by_arrays = self.evaluate_many(&batch, &self.by)?;
985        let mut hashes = vec![0; num_rows];
986        create_hashes(&by_arrays, &self.random_state, &mut hashes)?;
987        let by_rows = self.row_converter.convert_columns(&by_arrays)?;
988        let mut ts_column = batch.column(self.time_index).clone();
989        if !matches!(
990            ts_column.data_type(),
991            DataType::Timestamp(TimeUnit::Millisecond, _)
992        ) {
993            ts_column = compute::cast(
994                ts_column.as_ref(),
995                &DataType::Timestamp(TimeUnit::Millisecond, None),
996            )?;
997        }
998        let ts_column_ref = ts_column
999            .as_any()
1000            .downcast_ref::<TimestampMillisecondArray>()
1001            .ok_or_else(|| {
1002                DataFusionError::Execution(
1003                    "Time index Column downcast to TimestampMillisecondArray failed".into(),
1004                )
1005            })?;
1006        for i in 0..self.range_exec.len() {
1007            let args = self.evaluate_many(&batch, &self.range_exec[i].expressions())?;
1008            // use self.modify_map record (hash, align_ts) => [row_nums]
1009            produce_align_time(
1010                self.align_to,
1011                self.range_exec[i].range,
1012                self.align,
1013                ts_column_ref,
1014                &hashes,
1015                &mut self.modify_map,
1016            );
1017            // build modify_rows/modify_index/offsets for batch update
1018            let mut modify_rows = UInt32Builder::with_capacity(0);
1019            // (hash, align_ts, row_num)
1020            // row_num use to find a by value
1021            // So we just need to record the row_num of a modify row randomly, because they all have the same by value
1022            let mut modify_index = Vec::with_capacity(self.modify_map.len());
1023            let mut offsets = vec![0];
1024            let mut offset_so_far = 0;
1025            for ((hash, ts), modify) in &self.modify_map {
1026                modify_rows.append_slice(modify);
1027                offset_so_far += modify.len();
1028                offsets.push(offset_so_far);
1029                modify_index.push((*hash, *ts, modify[0]));
1030            }
1031            let modify_rows = modify_rows.finish();
1032            let args = take_arrays(&args, &modify_rows, None)?;
1033            modify_index.iter().zip(offsets.windows(2)).try_for_each(
1034                |((hash, ts, row), offset)| {
1035                    let (offset, length) = (offset[0], offset[1] - offset[0]);
1036                    let sliced_arrays: Vec<ArrayRef> = args
1037                        .iter()
1038                        .map(|array| array.slice(offset, length))
1039                        .collect();
1040                    let accumulators_map =
1041                        self.series_map.entry(*hash).or_insert_with(|| SeriesState {
1042                            row: by_rows.row(*row as usize).owned(),
1043                            align_ts_accumulator: BTreeMap::new(),
1044                        });
1045                    match accumulators_map.align_ts_accumulator.entry(*ts) {
1046                        Entry::Occupied(mut e) => {
1047                            let accumulators = e.get_mut();
1048                            accumulators[i].update_batch(&sliced_arrays)
1049                        }
1050                        Entry::Vacant(e) => {
1051                            self.num_not_null_rows += 1;
1052                            let mut accumulators = self
1053                                .range_exec
1054                                .iter()
1055                                .map(|range| range.expr.create_accumulator())
1056                                .collect::<DfResult<Vec<_>>>()?;
1057                            let result = accumulators[i].update_batch(&sliced_arrays);
1058                            e.insert(accumulators);
1059                            result
1060                        }
1061                    }
1062                },
1063            )?;
1064        }
1065        Ok(())
1066    }
1067
1068    fn generate_output(&mut self) -> DfResult<RecordBatch> {
1069        let _timer = self.metric.elapsed_compute().timer();
1070        if self.series_map.is_empty() {
1071            return Ok(RecordBatch::new_empty(self.schema.clone()));
1072        }
1073        // 1 for time index column
1074        let mut columns: Vec<Arc<dyn Array>> =
1075            Vec::with_capacity(1 + self.range_exec.len() + self.by.len());
1076        let mut ts_builder = TimestampMillisecondBuilder::with_capacity(self.num_not_null_rows);
1077        let mut all_scalar =
1078            vec![Vec::with_capacity(self.num_not_null_rows); self.range_exec.len()];
1079        let mut by_rows = Vec::with_capacity(self.num_not_null_rows);
1080        let mut start_index = 0;
1081        // If any range expr need fill, we need fill both the missing align_ts and null value.
1082        let need_fill_output = self.range_exec.iter().any(|range| range.fill.is_some());
1083        // The padding value for each accumulator
1084        let padding_values = self
1085            .range_exec
1086            .iter()
1087            .map(|e| e.expr.create_accumulator()?.evaluate())
1088            .collect::<DfResult<Vec<_>>>()?;
1089        for SeriesState {
1090            row,
1091            align_ts_accumulator,
1092        } in self.series_map.values_mut()
1093        {
1094            // skip empty time series
1095            if align_ts_accumulator.is_empty() {
1096                continue;
1097            }
1098            // find the first and last align_ts
1099            let begin_ts = *align_ts_accumulator.first_key_value().unwrap().0;
1100            let end_ts = *align_ts_accumulator.last_key_value().unwrap().0;
1101            let align_ts = if need_fill_output {
1102                // we need to fill empty align_ts which not data in that solt
1103                (begin_ts..=end_ts).step_by(self.align as usize).collect()
1104            } else {
1105                align_ts_accumulator.keys().copied().collect::<Vec<_>>()
1106            };
1107            for ts in &align_ts {
1108                if let Some(slot) = align_ts_accumulator.get_mut(ts) {
1109                    for (column, acc) in all_scalar.iter_mut().zip(slot.iter_mut()) {
1110                        column.push(acc.evaluate()?);
1111                    }
1112                } else {
1113                    // fill null in empty time solt
1114                    for (column, padding) in all_scalar.iter_mut().zip(padding_values.iter()) {
1115                        column.push(padding.clone())
1116                    }
1117                }
1118            }
1119            ts_builder.append_slice(&align_ts);
1120            // apply fill strategy on time series
1121            for (
1122                i,
1123                RangeFnExec {
1124                    fill, need_cast, ..
1125                },
1126            ) in self.range_exec.iter().enumerate()
1127            {
1128                let time_series_data =
1129                    &mut all_scalar[i][start_index..start_index + align_ts.len()];
1130                if let Some(data_type) = need_cast {
1131                    cast_scalar_values(time_series_data, data_type)?;
1132                }
1133                if let Some(fill) = fill {
1134                    fill.apply_fill_strategy(&align_ts, time_series_data)?;
1135                }
1136            }
1137            by_rows.resize(by_rows.len() + align_ts.len(), row.row());
1138            start_index += align_ts.len();
1139        }
1140        for column_scalar in all_scalar {
1141            columns.push(ScalarValue::iter_to_array(column_scalar)?);
1142        }
1143        let ts_column = ts_builder.finish();
1144        // output schema before project follow the order of range expr | time index | by columns
1145        let ts_column = compute::cast(
1146            &ts_column,
1147            self.schema_before_project.field(columns.len()).data_type(),
1148        )?;
1149        columns.push(ts_column);
1150        columns.extend(self.row_converter.convert_rows(by_rows)?);
1151        let output = RecordBatch::try_new(self.schema_before_project.clone(), columns)?;
1152        let project_output = if let Some(project) = &self.schema_project {
1153            output.project(project)?
1154        } else {
1155            output
1156        };
1157        Ok(project_output)
1158    }
1159
1160    fn next_output_batch(&mut self) -> DfResult<Option<RecordBatch>> {
1161        if self.output_batch.is_none() {
1162            self.output_batch = Some(self.generate_output()?);
1163            self.output_batch_offset = 0;
1164        }
1165
1166        let num_rows = self.output_batch.as_ref().unwrap().num_rows();
1167        if num_rows == 0 {
1168            self.output_batch = None;
1169            self.output_batch_offset = 0;
1170            return Ok(None);
1171        }
1172
1173        if self.output_batch_offset == 0 && num_rows <= self.batch_size {
1174            return Ok(self.output_batch.take());
1175        }
1176
1177        let offset = self.output_batch_offset;
1178        let len = (num_rows - offset).min(self.batch_size);
1179        let batch = self.output_batch.as_ref().unwrap().slice(offset, len);
1180        self.output_batch_offset += len;
1181
1182        if self.output_batch_offset >= num_rows {
1183            self.output_batch = None;
1184            self.output_batch_offset = 0;
1185        }
1186
1187        Ok(Some(batch))
1188    }
1189}
1190
1191enum ExecutionState {
1192    ReadingInput,
1193    ProducingOutput,
1194    Done,
1195}
1196
1197impl RecordBatchStream for RangeSelectStream {
1198    fn schema(&self) -> SchemaRef {
1199        self.schema.clone()
1200    }
1201}
1202
1203impl Stream for RangeSelectStream {
1204    type Item = DataFusionResult<RecordBatch>;
1205
1206    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1207        loop {
1208            match self.exec_state {
1209                ExecutionState::ReadingInput => {
1210                    match ready!(self.input.poll_next_unpin(cx)) {
1211                        // new batch to aggregate
1212                        Some(Ok(batch)) => {
1213                            if let Err(e) = self.update_range_context(batch) {
1214                                common_telemetry::debug!(
1215                                    "RangeSelectStream cannot update range context, schema: {:?}, err: {:?}",
1216                                    self.schema,
1217                                    e
1218                                );
1219                                return Poll::Ready(Some(Err(e)));
1220                            }
1221                        }
1222                        // inner had error, return to caller
1223                        Some(Err(e)) => return Poll::Ready(Some(Err(e))),
1224                        // inner is done, producing output
1225                        None => {
1226                            self.exec_state = ExecutionState::ProducingOutput;
1227                        }
1228                    }
1229                }
1230                ExecutionState::ProducingOutput => {
1231                    let result = self.next_output_batch();
1232                    return match result {
1233                        // made output
1234                        Ok(Some(batch)) => {
1235                            if self.output_batch.is_none() {
1236                                self.exec_state = ExecutionState::Done;
1237                            }
1238                            Poll::Ready(Some(Ok(batch)))
1239                        }
1240                        Ok(None) => {
1241                            self.exec_state = ExecutionState::Done;
1242                            Poll::Ready(None)
1243                        }
1244                        // error making output
1245                        Err(error) => Poll::Ready(Some(Err(error))),
1246                    };
1247                }
1248                ExecutionState::Done => return Poll::Ready(None),
1249            }
1250        }
1251    }
1252}
1253
1254#[cfg(test)]
1255mod test {
1256    macro_rules! nullable_array {
1257        ($builder:ident,) => {
1258        };
1259        ($array_type:ident ; $($tail:tt)*) => {
1260            paste::item! {
1261                {
1262                    let mut builder = arrow::array::[<$array_type Builder>]::new();
1263                    nullable_array!(builder, $($tail)*);
1264                    builder.finish()
1265                }
1266            }
1267        };
1268        ($builder:ident, null) => {
1269            $builder.append_null();
1270        };
1271        ($builder:ident, null, $($tail:tt)*) => {
1272            $builder.append_null();
1273            nullable_array!($builder, $($tail)*);
1274        };
1275        ($builder:ident, $value:literal) => {
1276            $builder.append_value($value);
1277        };
1278        ($builder:ident, $value:literal, $($tail:tt)*) => {
1279            $builder.append_value($value);
1280            nullable_array!($builder, $($tail)*);
1281        };
1282    }
1283
1284    use std::sync::Arc;
1285
1286    use arrow_schema::SortOptions;
1287    use datafusion::arrow::datatypes::{
1288        ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
1289    };
1290    use datafusion::datasource::memory::MemorySourceConfig;
1291    use datafusion::datasource::source::DataSourceExec;
1292    use datafusion::functions_aggregate::min_max;
1293    use datafusion::physical_plan::sorts::sort::SortExec;
1294    use datafusion::prelude::SessionContext;
1295    use datafusion_physical_expr::PhysicalSortExpr;
1296    use datafusion_physical_expr::expressions::Column;
1297    use datatypes::arrow::array::{Float64Array, Int64Array, TimestampMillisecondArray};
1298    use datatypes::arrow_array::StringArray;
1299
1300    use super::*;
1301
1302    const TIME_INDEX_COLUMN: &str = "timestamp";
1303
1304    fn prepare_test_data(is_float: bool, is_gap: bool) -> DataSourceExec {
1305        let schema = Arc::new(Schema::new(vec![
1306            Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
1307            Field::new(
1308                "value",
1309                if is_float {
1310                    DataType::Float64
1311                } else {
1312                    DataType::Int64
1313                },
1314                true,
1315            ),
1316            Field::new("host", DataType::Utf8, true),
1317        ]));
1318        let timestamp_column: Arc<dyn Array> = if !is_gap {
1319            Arc::new(TimestampMillisecondArray::from(vec![
1320                0, 5_000, 10_000, 15_000, 20_000, // host 1 every 5s
1321                0, 5_000, 10_000, 15_000, 20_000, // host 2 every 5s
1322            ])) as _
1323        } else {
1324            Arc::new(TimestampMillisecondArray::from(vec![
1325                0, 15_000, // host 1 every 5s, missing data on 5_000, 10_000
1326                0, 15_000, // host 2 every 5s, missing data on 5_000, 10_000
1327            ])) as _
1328        };
1329        let mut host = vec!["host1"; timestamp_column.len() / 2];
1330        host.extend(vec!["host2"; timestamp_column.len() / 2]);
1331        let mut value_column: Arc<dyn Array> = if is_gap {
1332            Arc::new(nullable_array!(Int64;
1333                0, 6, // data for host 1
1334                6, 12 // data for host 2
1335            )) as _
1336        } else {
1337            Arc::new(nullable_array!(Int64;
1338                0, null, 1, null, 2, // data for host 1
1339                3, null, 4, null, 5 // data for host 2
1340            )) as _
1341        };
1342        if is_float {
1343            value_column =
1344                cast_with_options(&value_column, &DataType::Float64, &CastOptions::default())
1345                    .unwrap();
1346        }
1347        let host_column: Arc<dyn Array> = Arc::new(StringArray::from(host)) as _;
1348        let data = RecordBatch::try_new(
1349            schema.clone(),
1350            vec![timestamp_column, value_column, host_column],
1351        )
1352        .unwrap();
1353
1354        DataSourceExec::new(Arc::new(
1355            MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(),
1356        ))
1357    }
1358
1359    fn prepare_empty_test_data(is_float: bool) -> DataSourceExec {
1360        let schema = Arc::new(Schema::new(vec![
1361            Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
1362            Field::new(
1363                "value",
1364                if is_float {
1365                    DataType::Float64
1366                } else {
1367                    DataType::Int64
1368                },
1369                true,
1370            ),
1371            Field::new("host", DataType::Utf8, true),
1372        ]));
1373        let timestamp_column: Arc<dyn Array> =
1374            Arc::new(TimestampMillisecondArray::from(Vec::<i64>::new())) as _;
1375        let value_column: Arc<dyn Array> = if is_float {
1376            Arc::new(Float64Array::from(Vec::<Option<f64>>::new())) as _
1377        } else {
1378            Arc::new(Int64Array::from(Vec::<Option<i64>>::new())) as _
1379        };
1380        let host_column: Arc<dyn Array> =
1381            Arc::new(StringArray::from(Vec::<Option<&str>>::new())) as _;
1382        let data = RecordBatch::try_new(
1383            schema.clone(),
1384            vec![timestamp_column, value_column, host_column],
1385        )
1386        .unwrap();
1387
1388        DataSourceExec::new(Arc::new(
1389            MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(),
1390        ))
1391    }
1392
1393    async fn collect_range_select_test(
1394        range1: Millisecond,
1395        range2: Millisecond,
1396        align: Millisecond,
1397        fill: Option<Fill>,
1398        is_float: bool,
1399        is_gap: bool,
1400        batch_size: usize,
1401    ) -> Vec<RecordBatch> {
1402        let data_type = if is_float {
1403            DataType::Float64
1404        } else {
1405            DataType::Int64
1406        };
1407        let (need_cast, schema_data_type) = if !is_float && matches!(fill, Some(Fill::Linear)) {
1408            // data_type = DataType::Float64;
1409            (Some(DataType::Float64), DataType::Float64)
1410        } else {
1411            (None, data_type.clone())
1412        };
1413        let memory_exec = Arc::new(prepare_test_data(is_float, is_gap));
1414        let schema = Arc::new(Schema::new(vec![
1415            Field::new("MIN(value)", schema_data_type.clone(), true),
1416            Field::new("MAX(value)", schema_data_type, true),
1417            Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
1418            Field::new("host", DataType::Utf8, true),
1419        ]));
1420        let cache = Arc::new(PlanProperties::new(
1421            EquivalenceProperties::new(schema.clone()),
1422            Partitioning::UnknownPartitioning(1),
1423            EmissionType::Incremental,
1424            Boundedness::Bounded,
1425        ));
1426        let input_schema = memory_exec.schema().clone();
1427        let range_select_exec = Arc::new(RangeSelectExec {
1428            input: memory_exec,
1429            range_exec: vec![
1430                RangeFnExec {
1431                    expr: Arc::new(
1432                        AggregateExprBuilder::new(
1433                            min_max::min_udaf(),
1434                            vec![Arc::new(Column::new("value", 1))],
1435                        )
1436                        .schema(input_schema.clone())
1437                        .alias("MIN(value)")
1438                        .build()
1439                        .unwrap(),
1440                    ),
1441                    range: range1,
1442                    fill: fill.clone(),
1443                    need_cast: need_cast.clone(),
1444                },
1445                RangeFnExec {
1446                    expr: Arc::new(
1447                        AggregateExprBuilder::new(
1448                            min_max::max_udaf(),
1449                            vec![Arc::new(Column::new("value", 1))],
1450                        )
1451                        .schema(input_schema.clone())
1452                        .alias("MAX(value)")
1453                        .build()
1454                        .unwrap(),
1455                    ),
1456                    range: range2,
1457                    fill,
1458                    need_cast,
1459                },
1460            ],
1461            align,
1462            align_to: 0,
1463            by: vec![Arc::new(Column::new("host", 2))],
1464            time_index: TIME_INDEX_COLUMN.to_string(),
1465            schema: schema.clone(),
1466            schema_before_project: schema.clone(),
1467            schema_project: None,
1468            by_schema: Arc::new(Schema::new(vec![Field::new("host", DataType::Utf8, true)])),
1469            metric: ExecutionPlanMetricsSet::new(),
1470            cache,
1471        });
1472        let sort_exec = SortExec::new(
1473            [
1474                PhysicalSortExpr {
1475                    expr: Arc::new(Column::new("host", 3)),
1476                    options: SortOptions {
1477                        descending: false,
1478                        nulls_first: true,
1479                    },
1480                },
1481                PhysicalSortExpr {
1482                    expr: Arc::new(Column::new(TIME_INDEX_COLUMN, 2)),
1483                    options: SortOptions {
1484                        descending: false,
1485                        nulls_first: true,
1486                    },
1487                },
1488            ]
1489            .into(),
1490            range_select_exec,
1491        );
1492        let session_context = SessionContext::new_with_config(
1493            datafusion::execution::config::SessionConfig::new().with_batch_size(batch_size),
1494        );
1495        datafusion::physical_plan::collect(Arc::new(sort_exec), session_context.task_ctx())
1496            .await
1497            .unwrap()
1498    }
1499
1500    async fn do_range_select_test(
1501        range1: Millisecond,
1502        range2: Millisecond,
1503        align: Millisecond,
1504        fill: Option<Fill>,
1505        is_float: bool,
1506        is_gap: bool,
1507        expected: String,
1508    ) {
1509        let result =
1510            collect_range_select_test(range1, range2, align, fill, is_float, is_gap, 8192).await;
1511
1512        let result_literal = arrow::util::pretty::pretty_format_batches(&result)
1513            .unwrap()
1514            .to_string();
1515
1516        assert_eq!(result_literal, expected);
1517    }
1518
1519    #[tokio::test]
1520    async fn range_10s_align_1000s() {
1521        let expected = String::from(
1522            "+------------+------------+---------------------+-------+\
1523            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1524            \n+------------+------------+---------------------+-------+\
1525            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1526            \n| 3.0        | 3.0        | 1970-01-01T00:00:00 | host2 |\
1527            \n+------------+------------+---------------------+-------+",
1528        );
1529        do_range_select_test(
1530            10_000,
1531            10_000,
1532            1_000_000,
1533            Some(Fill::Null),
1534            true,
1535            false,
1536            expected,
1537        )
1538        .await;
1539    }
1540
1541    #[tokio::test]
1542    async fn range_fill_null() {
1543        let expected = String::from(
1544            "+------------+------------+---------------------+-------+\
1545            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1546            \n+------------+------------+---------------------+-------+\
1547            \n| 0.0        |            | 1969-12-31T23:59:55 | host1 |\
1548            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1549            \n| 1.0        |            | 1970-01-01T00:00:05 | host1 |\
1550            \n| 1.0        | 1.0        | 1970-01-01T00:00:10 | host1 |\
1551            \n| 2.0        |            | 1970-01-01T00:00:15 | host1 |\
1552            \n| 2.0        | 2.0        | 1970-01-01T00:00:20 | host1 |\
1553            \n| 3.0        |            | 1969-12-31T23:59:55 | host2 |\
1554            \n| 3.0        | 3.0        | 1970-01-01T00:00:00 | host2 |\
1555            \n| 4.0        |            | 1970-01-01T00:00:05 | host2 |\
1556            \n| 4.0        | 4.0        | 1970-01-01T00:00:10 | host2 |\
1557            \n| 5.0        |            | 1970-01-01T00:00:15 | host2 |\
1558            \n| 5.0        | 5.0        | 1970-01-01T00:00:20 | host2 |\
1559            \n+------------+------------+---------------------+-------+",
1560        );
1561        do_range_select_test(
1562            10_000,
1563            5_000,
1564            5_000,
1565            Some(Fill::Null),
1566            true,
1567            false,
1568            expected,
1569        )
1570        .await;
1571    }
1572
1573    #[tokio::test]
1574    async fn range_fill_prev() {
1575        let expected = String::from(
1576            "+------------+------------+---------------------+-------+\
1577            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1578            \n+------------+------------+---------------------+-------+\
1579            \n| 0.0        |            | 1969-12-31T23:59:55 | host1 |\
1580            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1581            \n| 1.0        | 0.0        | 1970-01-01T00:00:05 | host1 |\
1582            \n| 1.0        | 1.0        | 1970-01-01T00:00:10 | host1 |\
1583            \n| 2.0        | 1.0        | 1970-01-01T00:00:15 | host1 |\
1584            \n| 2.0        | 2.0        | 1970-01-01T00:00:20 | host1 |\
1585            \n| 3.0        |            | 1969-12-31T23:59:55 | host2 |\
1586            \n| 3.0        | 3.0        | 1970-01-01T00:00:00 | host2 |\
1587            \n| 4.0        | 3.0        | 1970-01-01T00:00:05 | host2 |\
1588            \n| 4.0        | 4.0        | 1970-01-01T00:00:10 | host2 |\
1589            \n| 5.0        | 4.0        | 1970-01-01T00:00:15 | host2 |\
1590            \n| 5.0        | 5.0        | 1970-01-01T00:00:20 | host2 |\
1591            \n+------------+------------+---------------------+-------+",
1592        );
1593        do_range_select_test(
1594            10_000,
1595            5_000,
1596            5_000,
1597            Some(Fill::Prev),
1598            true,
1599            false,
1600            expected,
1601        )
1602        .await;
1603    }
1604
1605    #[tokio::test]
1606    async fn range_fill_linear() {
1607        let expected = String::from(
1608            "+------------+------------+---------------------+-------+\
1609            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1610            \n+------------+------------+---------------------+-------+\
1611            \n| 0.0        | -0.5       | 1969-12-31T23:59:55 | host1 |\
1612            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1613            \n| 1.0        | 0.5        | 1970-01-01T00:00:05 | host1 |\
1614            \n| 1.0        | 1.0        | 1970-01-01T00:00:10 | host1 |\
1615            \n| 2.0        | 1.5        | 1970-01-01T00:00:15 | host1 |\
1616            \n| 2.0        | 2.0        | 1970-01-01T00:00:20 | host1 |\
1617            \n| 3.0        | 2.5        | 1969-12-31T23:59:55 | host2 |\
1618            \n| 3.0        | 3.0        | 1970-01-01T00:00:00 | host2 |\
1619            \n| 4.0        | 3.5        | 1970-01-01T00:00:05 | host2 |\
1620            \n| 4.0        | 4.0        | 1970-01-01T00:00:10 | host2 |\
1621            \n| 5.0        | 4.5        | 1970-01-01T00:00:15 | host2 |\
1622            \n| 5.0        | 5.0        | 1970-01-01T00:00:20 | host2 |\
1623            \n+------------+------------+---------------------+-------+",
1624        );
1625        do_range_select_test(
1626            10_000,
1627            5_000,
1628            5_000,
1629            Some(Fill::Linear),
1630            true,
1631            false,
1632            expected,
1633        )
1634        .await;
1635    }
1636
1637    #[tokio::test]
1638    async fn range_fill_integer_linear() {
1639        let expected = String::from(
1640            "+------------+------------+---------------------+-------+\
1641            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1642            \n+------------+------------+---------------------+-------+\
1643            \n| 0.0        | -0.5       | 1969-12-31T23:59:55 | host1 |\
1644            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1645            \n| 1.0        | 0.5        | 1970-01-01T00:00:05 | host1 |\
1646            \n| 1.0        | 1.0        | 1970-01-01T00:00:10 | host1 |\
1647            \n| 2.0        | 1.5        | 1970-01-01T00:00:15 | host1 |\
1648            \n| 2.0        | 2.0        | 1970-01-01T00:00:20 | host1 |\
1649            \n| 3.0        | 2.5        | 1969-12-31T23:59:55 | host2 |\
1650            \n| 3.0        | 3.0        | 1970-01-01T00:00:00 | host2 |\
1651            \n| 4.0        | 3.5        | 1970-01-01T00:00:05 | host2 |\
1652            \n| 4.0        | 4.0        | 1970-01-01T00:00:10 | host2 |\
1653            \n| 5.0        | 4.5        | 1970-01-01T00:00:15 | host2 |\
1654            \n| 5.0        | 5.0        | 1970-01-01T00:00:20 | host2 |\
1655            \n+------------+------------+---------------------+-------+",
1656        );
1657        do_range_select_test(
1658            10_000,
1659            5_000,
1660            5_000,
1661            Some(Fill::Linear),
1662            false,
1663            false,
1664            expected,
1665        )
1666        .await;
1667    }
1668
1669    #[tokio::test]
1670    async fn range_fill_const() {
1671        let expected = String::from(
1672            "+------------+------------+---------------------+-------+\
1673            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1674            \n+------------+------------+---------------------+-------+\
1675            \n| 0.0        | 6.6        | 1969-12-31T23:59:55 | host1 |\
1676            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1677            \n| 1.0        | 6.6        | 1970-01-01T00:00:05 | host1 |\
1678            \n| 1.0        | 1.0        | 1970-01-01T00:00:10 | host1 |\
1679            \n| 2.0        | 6.6        | 1970-01-01T00:00:15 | host1 |\
1680            \n| 2.0        | 2.0        | 1970-01-01T00:00:20 | host1 |\
1681            \n| 3.0        | 6.6        | 1969-12-31T23:59:55 | host2 |\
1682            \n| 3.0        | 3.0        | 1970-01-01T00:00:00 | host2 |\
1683            \n| 4.0        | 6.6        | 1970-01-01T00:00:05 | host2 |\
1684            \n| 4.0        | 4.0        | 1970-01-01T00:00:10 | host2 |\
1685            \n| 5.0        | 6.6        | 1970-01-01T00:00:15 | host2 |\
1686            \n| 5.0        | 5.0        | 1970-01-01T00:00:20 | host2 |\
1687            \n+------------+------------+---------------------+-------+",
1688        );
1689        do_range_select_test(
1690            10_000,
1691            5_000,
1692            5_000,
1693            Some(Fill::Const(ScalarValue::Float64(Some(6.6)))),
1694            true,
1695            false,
1696            expected,
1697        )
1698        .await;
1699    }
1700
1701    #[tokio::test]
1702    async fn range_fill_gap() {
1703        let expected = String::from(
1704            "+------------+------------+---------------------+-------+\
1705            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1706            \n+------------+------------+---------------------+-------+\
1707            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1708            \n| 6.0        | 6.0        | 1970-01-01T00:00:15 | host1 |\
1709            \n| 6.0        | 6.0        | 1970-01-01T00:00:00 | host2 |\
1710            \n| 12.0       | 12.0       | 1970-01-01T00:00:15 | host2 |\
1711            \n+------------+------------+---------------------+-------+",
1712        );
1713        do_range_select_test(5_000, 5_000, 5_000, None, true, true, expected).await;
1714        let expected = String::from(
1715            "+------------+------------+---------------------+-------+\
1716            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1717            \n+------------+------------+---------------------+-------+\
1718            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1719            \n|            |            | 1970-01-01T00:00:05 | host1 |\
1720            \n|            |            | 1970-01-01T00:00:10 | host1 |\
1721            \n| 6.0        | 6.0        | 1970-01-01T00:00:15 | host1 |\
1722            \n| 6.0        | 6.0        | 1970-01-01T00:00:00 | host2 |\
1723            \n|            |            | 1970-01-01T00:00:05 | host2 |\
1724            \n|            |            | 1970-01-01T00:00:10 | host2 |\
1725            \n| 12.0       | 12.0       | 1970-01-01T00:00:15 | host2 |\
1726            \n+------------+------------+---------------------+-------+",
1727        );
1728        do_range_select_test(5_000, 5_000, 5_000, Some(Fill::Null), true, true, expected).await;
1729        let expected = String::from(
1730            "+------------+------------+---------------------+-------+\
1731            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1732            \n+------------+------------+---------------------+-------+\
1733            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1734            \n| 0.0        | 0.0        | 1970-01-01T00:00:05 | host1 |\
1735            \n| 0.0        | 0.0        | 1970-01-01T00:00:10 | host1 |\
1736            \n| 6.0        | 6.0        | 1970-01-01T00:00:15 | host1 |\
1737            \n| 6.0        | 6.0        | 1970-01-01T00:00:00 | host2 |\
1738            \n| 6.0        | 6.0        | 1970-01-01T00:00:05 | host2 |\
1739            \n| 6.0        | 6.0        | 1970-01-01T00:00:10 | host2 |\
1740            \n| 12.0       | 12.0       | 1970-01-01T00:00:15 | host2 |\
1741            \n+------------+------------+---------------------+-------+",
1742        );
1743        do_range_select_test(5_000, 5_000, 5_000, Some(Fill::Prev), true, true, expected).await;
1744        let expected = String::from(
1745            "+------------+------------+---------------------+-------+\
1746            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1747            \n+------------+------------+---------------------+-------+\
1748            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1749            \n| 2.0        | 2.0        | 1970-01-01T00:00:05 | host1 |\
1750            \n| 4.0        | 4.0        | 1970-01-01T00:00:10 | host1 |\
1751            \n| 6.0        | 6.0        | 1970-01-01T00:00:15 | host1 |\
1752            \n| 6.0        | 6.0        | 1970-01-01T00:00:00 | host2 |\
1753            \n| 8.0        | 8.0        | 1970-01-01T00:00:05 | host2 |\
1754            \n| 10.0       | 10.0       | 1970-01-01T00:00:10 | host2 |\
1755            \n| 12.0       | 12.0       | 1970-01-01T00:00:15 | host2 |\
1756            \n+------------+------------+---------------------+-------+",
1757        );
1758        do_range_select_test(
1759            5_000,
1760            5_000,
1761            5_000,
1762            Some(Fill::Linear),
1763            true,
1764            true,
1765            expected,
1766        )
1767        .await;
1768        let expected = String::from(
1769            "+------------+------------+---------------------+-------+\
1770            \n| MIN(value) | MAX(value) | timestamp           | host  |\
1771            \n+------------+------------+---------------------+-------+\
1772            \n| 0.0        | 0.0        | 1970-01-01T00:00:00 | host1 |\
1773            \n| 6.0        | 6.0        | 1970-01-01T00:00:05 | host1 |\
1774            \n| 6.0        | 6.0        | 1970-01-01T00:00:10 | host1 |\
1775            \n| 6.0        | 6.0        | 1970-01-01T00:00:15 | host1 |\
1776            \n| 6.0        | 6.0        | 1970-01-01T00:00:00 | host2 |\
1777            \n| 6.0        | 6.0        | 1970-01-01T00:00:05 | host2 |\
1778            \n| 6.0        | 6.0        | 1970-01-01T00:00:10 | host2 |\
1779            \n| 12.0       | 12.0       | 1970-01-01T00:00:15 | host2 |\
1780            \n+------------+------------+---------------------+-------+",
1781        );
1782        do_range_select_test(
1783            5_000,
1784            5_000,
1785            5_000,
1786            Some(Fill::Const(ScalarValue::Float64(Some(6.0)))),
1787            true,
1788            true,
1789            expected,
1790        )
1791        .await;
1792    }
1793
1794    #[tokio::test]
1795    async fn range_select_respects_session_batch_size() {
1796        let result =
1797            collect_range_select_test(10_000, 5_000, 5_000, Some(Fill::Null), true, false, 3).await;
1798
1799        let row_counts = result
1800            .iter()
1801            .map(|batch| batch.num_rows())
1802            .collect::<Vec<_>>();
1803        assert_eq!(vec![3, 3, 3, 3], row_counts);
1804    }
1805
1806    #[tokio::test]
1807    async fn range_select_skips_empty_output_batch() {
1808        let memory_exec = Arc::new(prepare_empty_test_data(true));
1809        let schema = Arc::new(Schema::new(vec![
1810            Field::new("MIN(value)", DataType::Float64, true),
1811            Field::new("MAX(value)", DataType::Float64, true),
1812            Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
1813            Field::new("host", DataType::Utf8, true),
1814        ]));
1815        let cache = Arc::new(PlanProperties::new(
1816            EquivalenceProperties::new(schema.clone()),
1817            Partitioning::UnknownPartitioning(1),
1818            EmissionType::Incremental,
1819            Boundedness::Bounded,
1820        ));
1821        let input_schema = memory_exec.schema().clone();
1822        let range_select_exec = Arc::new(RangeSelectExec {
1823            input: memory_exec,
1824            range_exec: vec![
1825                RangeFnExec {
1826                    expr: Arc::new(
1827                        AggregateExprBuilder::new(
1828                            min_max::min_udaf(),
1829                            vec![Arc::new(Column::new("value", 1))],
1830                        )
1831                        .schema(input_schema.clone())
1832                        .alias("MIN(value)")
1833                        .build()
1834                        .unwrap(),
1835                    ),
1836                    range: 10_000,
1837                    fill: Some(Fill::Null),
1838                    need_cast: None,
1839                },
1840                RangeFnExec {
1841                    expr: Arc::new(
1842                        AggregateExprBuilder::new(
1843                            min_max::max_udaf(),
1844                            vec![Arc::new(Column::new("value", 1))],
1845                        )
1846                        .schema(input_schema)
1847                        .alias("MAX(value)")
1848                        .build()
1849                        .unwrap(),
1850                    ),
1851                    range: 5_000,
1852                    fill: Some(Fill::Null),
1853                    need_cast: None,
1854                },
1855            ],
1856            align: 5_000,
1857            align_to: 0,
1858            by: vec![Arc::new(Column::new("host", 2))],
1859            time_index: TIME_INDEX_COLUMN.to_string(),
1860            schema: schema.clone(),
1861            schema_before_project: schema.clone(),
1862            schema_project: None,
1863            by_schema: Arc::new(Schema::new(vec![Field::new("host", DataType::Utf8, true)])),
1864            metric: ExecutionPlanMetricsSet::new(),
1865            cache,
1866        });
1867        let session_context = SessionContext::new();
1868        let result =
1869            datafusion::physical_plan::collect(range_select_exec, session_context.task_ctx())
1870                .await
1871                .unwrap();
1872
1873        assert!(result.is_empty());
1874    }
1875
1876    #[test]
1877    fn fill_test() {
1878        assert!(Fill::try_from_str("", &DataType::UInt8).unwrap().is_none());
1879        assert!(Fill::try_from_str("Linear", &DataType::UInt8).unwrap() == Some(Fill::Linear));
1880        assert_eq!(
1881            Fill::try_from_str("Linear", &DataType::Boolean)
1882                .unwrap_err()
1883                .to_string(),
1884            "Error during planning: Use FILL LINEAR on Non-numeric DataType Boolean"
1885        );
1886        assert_eq!(
1887            Fill::try_from_str("WHAT", &DataType::UInt8)
1888                .unwrap_err()
1889                .to_string(),
1890            "Error during planning: WHAT is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string 'WHAT' to value of UInt8 type }"
1891        );
1892        assert_eq!(
1893            Fill::try_from_str("8.0", &DataType::UInt8)
1894                .unwrap_err()
1895                .to_string(),
1896            "Error during planning: 8.0 is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string '8.0' to value of UInt8 type }"
1897        );
1898        assert!(
1899            Fill::try_from_str("8", &DataType::UInt8).unwrap()
1900                == Some(Fill::Const(ScalarValue::UInt8(Some(8))))
1901        );
1902        let mut test1 = vec![
1903            ScalarValue::UInt8(Some(8)),
1904            ScalarValue::UInt8(None),
1905            ScalarValue::UInt8(Some(9)),
1906        ];
1907        Fill::Null.apply_fill_strategy(&[], &mut test1).unwrap();
1908        assert_eq!(test1[1], ScalarValue::UInt8(None));
1909        Fill::Prev.apply_fill_strategy(&[], &mut test1).unwrap();
1910        assert_eq!(test1[1], ScalarValue::UInt8(Some(8)));
1911        test1[1] = ScalarValue::UInt8(None);
1912        Fill::Const(ScalarValue::UInt8(Some(10)))
1913            .apply_fill_strategy(&[], &mut test1)
1914            .unwrap();
1915        assert_eq!(test1[1], ScalarValue::UInt8(Some(10)));
1916    }
1917
1918    #[test]
1919    fn test_fill_linear() {
1920        let ts = vec![1, 2, 3, 4, 5];
1921        let mut test = vec![
1922            ScalarValue::Float32(Some(1.0)),
1923            ScalarValue::Float32(None),
1924            ScalarValue::Float32(Some(3.0)),
1925            ScalarValue::Float32(None),
1926            ScalarValue::Float32(Some(5.0)),
1927        ];
1928        Fill::Linear.apply_fill_strategy(&ts, &mut test).unwrap();
1929        let mut test1 = vec![
1930            ScalarValue::Float32(None),
1931            ScalarValue::Float32(Some(2.0)),
1932            ScalarValue::Float32(None),
1933            ScalarValue::Float32(Some(4.0)),
1934            ScalarValue::Float32(None),
1935        ];
1936        Fill::Linear.apply_fill_strategy(&ts, &mut test1).unwrap();
1937        assert_eq!(test, test1);
1938        // test linear interpolation on irregularly spaced ts/data
1939        let ts = vec![
1940            1,   // None
1941            3,   // 1.0
1942            8,   // 11.0
1943            30,  // None
1944            88,  // 10.0
1945            108, // 5.0
1946            128, // None
1947        ];
1948        let mut test = vec![
1949            ScalarValue::Float64(None),
1950            ScalarValue::Float64(Some(1.0)),
1951            ScalarValue::Float64(Some(11.0)),
1952            ScalarValue::Float64(None),
1953            ScalarValue::Float64(Some(10.0)),
1954            ScalarValue::Float64(Some(5.0)),
1955            ScalarValue::Float64(None),
1956        ];
1957        Fill::Linear.apply_fill_strategy(&ts, &mut test).unwrap();
1958        let data: Vec<_> = test
1959            .into_iter()
1960            .map(|x| {
1961                let ScalarValue::Float64(Some(f)) = x else {
1962                    unreachable!()
1963                };
1964                f
1965            })
1966            .collect();
1967        assert_eq!(data, vec![-3.0, 1.0, 11.0, 10.725, 10.0, 5.0, 0.0]);
1968        // test corner case
1969        let ts = vec![1];
1970        let test = vec![ScalarValue::Float32(None)];
1971        let mut test1 = test.clone();
1972        Fill::Linear.apply_fill_strategy(&ts, &mut test1).unwrap();
1973        assert_eq!(test, test1);
1974    }
1975}