query/range_select/
plan_rewrite.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeSet;
16use std::sync::Arc;
17use std::time::Duration;
18
19use arrow_schema::DataType;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_time::interval::{MS_PER_DAY, NANOS_PER_MILLI};
23use common_time::timestamp::TimeUnit;
24use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp, Timezone};
25use datafusion::datasource::DefaultTableSource;
26use datafusion::prelude::Column;
27use datafusion::scalar::ScalarValue;
28use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
29use datafusion_common::{DFSchema, DataFusionError, Result as DFResult};
30use datafusion_expr::expr::WildcardOptions;
31use datafusion_expr::simplify::SimplifyContext;
32use datafusion_expr::{
33    Aggregate, Analyze, Cast, Distinct, DistinctOn, Explain, Expr, ExprSchemable, Extension,
34    Literal, LogicalPlan, LogicalPlanBuilder, Projection,
35};
36use datafusion_optimizer::simplify_expressions::ExprSimplifier;
37use datatypes::prelude::ConcreteDataType;
38use promql_parser::util::parse_duration;
39use session::context::QueryContextRef;
40use snafu::{OptionExt, ResultExt, ensure};
41use table::table::adapter::DfTableProviderAdapter;
42
43use crate::error::{
44    CatalogSnafu, RangeQuerySnafu, Result, TimeIndexNotFoundSnafu, UnknownTableSnafu,
45};
46use crate::plan::ExtractExpr;
47use crate::range_select::plan::{Fill, RangeFn, RangeSelect};
48
49/// `RangeExprRewriter` will recursively search certain `Expr`, find all `range_fn` scalar udf contained in `Expr`,
50/// and collect the information required by the RangeSelect query,
51/// and finally modify the `range_fn` scalar udf to an ordinary column field.
52pub struct RangeExprRewriter<'a> {
53    input_plan: &'a Arc<LogicalPlan>,
54    align: Duration,
55    align_to: i64,
56    by: Vec<Expr>,
57    /// Use `BTreeSet` to avoid in case like `avg(a) RANGE '5m' + avg(a) RANGE '5m'`, duplicate range expr `avg(a) RANGE '5m'` be calculate twice
58    range_fn: BTreeSet<RangeFn>,
59    sub_aggr: &'a Aggregate,
60    query_ctx: &'a QueryContextRef,
61}
62
63impl RangeExprRewriter<'_> {
64    pub fn get_range_expr(&self, args: &[Expr], i: usize) -> DFResult<Expr> {
65        match args.get(i) {
66            Some(Expr::Column(column)) => {
67                let index = self.sub_aggr.schema.index_of_column(column)?;
68                let len = self.sub_aggr.group_expr.len();
69                self.sub_aggr
70                    .aggr_expr
71                    .get(index - len)
72                    .cloned()
73                    .ok_or(DataFusionError::Plan(
74                        "Range expr not found in underlying Aggregate Plan".into(),
75                    ))
76            }
77            Some(Expr::Alias(alias)) => {
78                self.get_range_expr(std::slice::from_ref(alias.expr.as_ref()), 0)
79            }
80            other => Err(dispose_parse_error(other)),
81        }
82    }
83}
84
85#[inline]
86fn dispose_parse_error(expr: Option<&Expr>) -> DataFusionError {
87    DataFusionError::Plan(
88        expr.map(|x| {
89            format!(
90                "Illegal argument `{}` in range select query",
91                x.schema_name()
92            )
93        })
94        .unwrap_or("Missing argument in range select query".into()),
95    )
96}
97
98fn parse_str_expr(args: &[Expr], i: usize) -> DFResult<&str> {
99    match args.get(i) {
100        Some(Expr::Literal(ScalarValue::Utf8(Some(str)), _)) => Ok(str.as_str()),
101        other => Err(dispose_parse_error(other)),
102    }
103}
104
105fn parse_expr_to_string(args: &[Expr], i: usize) -> DFResult<String> {
106    match args.get(i) {
107        Some(Expr::Literal(ScalarValue::Utf8(Some(str)), _)) => Ok(str.clone()),
108        Some(expr) => Ok(expr.schema_name().to_string()),
109        None => Err(dispose_parse_error(None)),
110    }
111}
112
113/// Parse a duraion expr:
114/// 1. duration string (e.g. `'1h'`)
115/// 2. Interval expr (e.g. `INTERVAL '1 year 3 hours 20 minutes'`)
116/// 3. An interval expr can be evaluated at the logical plan stage (e.g. `INTERVAL '2' day - INTERVAL '1' day`)
117fn parse_duration_expr(args: &[Expr], i: usize) -> DFResult<Duration> {
118    match args.get(i) {
119        Some(Expr::Literal(ScalarValue::Utf8(Some(str)), _)) => {
120            parse_duration(str).map_err(DataFusionError::Plan)
121        }
122        Some(expr) => {
123            let ms = evaluate_expr_to_millisecond(args, i, true)?;
124            if ms <= 0 {
125                return Err(dispose_parse_error(Some(expr)));
126            }
127            Ok(Duration::from_millis(ms as u64))
128        }
129        None => Err(dispose_parse_error(None)),
130    }
131}
132
133/// Evaluate a time calculation expr, case like:
134/// 1. `INTERVAL '1' day + INTERVAL '1 year 2 hours 3 minutes'`
135/// 2. `now() - INTERVAL '1' day` (when `interval_only==false`)
136///
137/// Output a millisecond timestamp
138///
139/// if `interval_only==true`, only accept expr with all interval type (case 2 will return a error)
140fn evaluate_expr_to_millisecond(args: &[Expr], i: usize, interval_only: bool) -> DFResult<i64> {
141    let Some(expr) = args.get(i) else {
142        return Err(dispose_parse_error(None));
143    };
144    if interval_only && !interval_only_in_expr(expr) {
145        return Err(dispose_parse_error(Some(expr)));
146    }
147    let info = SimplifyContext::default().with_current_time();
148    let simplify_expr = ExprSimplifier::new(info).simplify(expr.clone())?;
149    match simplify_expr {
150        Expr::Literal(ScalarValue::TimestampNanosecond(ts_nanos, _), _)
151        | Expr::Literal(ScalarValue::DurationNanosecond(ts_nanos), _) => {
152            ts_nanos.map(|v| v / 1_000_000)
153        }
154        Expr::Literal(ScalarValue::TimestampMicrosecond(ts_micros, _), _)
155        | Expr::Literal(ScalarValue::DurationMicrosecond(ts_micros), _) => {
156            ts_micros.map(|v| v / 1_000)
157        }
158        Expr::Literal(ScalarValue::TimestampMillisecond(ts_millis, _), _)
159        | Expr::Literal(ScalarValue::DurationMillisecond(ts_millis), _) => ts_millis,
160        Expr::Literal(ScalarValue::TimestampSecond(ts_secs, _), _)
161        | Expr::Literal(ScalarValue::DurationSecond(ts_secs), _) => ts_secs.map(|v| v * 1_000),
162        // We don't support interval with months as days in a month is unclear.
163        Expr::Literal(ScalarValue::IntervalYearMonth(interval), _) => interval
164            .map(|v| {
165                let interval = IntervalYearMonth::from_i32(v);
166                if interval.months != 0 {
167                    return Err(DataFusionError::Plan(format!(
168                        "Year or month interval is not allowed in range query: {}",
169                        expr.schema_name()
170                    )));
171                }
172
173                Ok(0)
174            })
175            .transpose()?,
176        Expr::Literal(ScalarValue::IntervalDayTime(interval), _) => interval.map(|v| {
177            let interval = IntervalDayTime::from(v);
178            interval.as_millis()
179        }),
180        Expr::Literal(ScalarValue::IntervalMonthDayNano(interval), _) => interval
181            .map(|v| {
182                let interval = IntervalMonthDayNano::from(v);
183                if interval.months != 0 {
184                    return Err(DataFusionError::Plan(format!(
185                        "Year or month interval is not allowed in range query: {}",
186                        expr.schema_name()
187                    )));
188                }
189
190                Ok(interval.days as i64 * MS_PER_DAY + interval.nanoseconds / NANOS_PER_MILLI)
191            })
192            .transpose()?,
193        _ => None,
194    }
195    .ok_or_else(|| {
196        DataFusionError::Plan(format!(
197            "{} is not a expr can be evaluate and use in range query",
198            expr.schema_name()
199        ))
200    })
201}
202
203/// Parse the `align to` clause and return a UTC timestamp with unit of millisecond,
204/// which is used as the basis for dividing time slot during the align operation.
205/// 1. NOW: align to current execute time
206/// 2. Timestamp string: align to specific timestamp
207/// 3. An expr can be evaluated at the logical plan stage (e.g. `now() - INTERVAL '1' day`)
208/// 4. leave empty (as Default Option): align to unix epoch 0 (timezone aware)
209fn parse_align_to(args: &[Expr], i: usize, timezone: Option<&Timezone>) -> DFResult<i64> {
210    let Ok(s) = parse_str_expr(args, i) else {
211        return evaluate_expr_to_millisecond(args, i, false);
212    };
213    let upper = s.to_uppercase();
214    match upper.as_str() {
215        "NOW" => return Ok(Timestamp::current_millis().value()),
216        // default align to unix epoch 0 (timezone aware)
217        "" => return Ok(timezone.map(|tz| tz.local_minus_utc() * 1000).unwrap_or(0)),
218        _ => (),
219    }
220
221    Timestamp::from_str(s, timezone)
222        .map_err(|e| {
223            DataFusionError::Plan(format!(
224                "Illegal `align to` argument `{}` in range select query, can't be parse as NOW/CALENDAR/Timestamp, error: {}",
225                s, e
226            ))
227        })?.convert_to(TimeUnit::Millisecond).map(|x|x.value()).ok_or(DataFusionError::Plan(format!(
228            "Illegal `align to` argument `{}` in range select query, can't be convert to a valid Timestamp",
229            s
230        ))
231        )
232}
233
234fn parse_expr_list(args: &[Expr], start: usize, len: usize) -> DFResult<Vec<Expr>> {
235    let mut outs = Vec::with_capacity(len);
236    for i in start..start + len {
237        outs.push(match &args.get(i) {
238            Some(
239                Expr::Column(_)
240                | Expr::Literal(_, _)
241                | Expr::BinaryExpr(_)
242                | Expr::ScalarFunction(_),
243            ) => args[i].clone(),
244            Some(Expr::Alias(alias)) if matches!(*alias.expr, Expr::ScalarFunction(_)) => {
245                args[i].clone()
246            }
247            other => {
248                return Err(dispose_parse_error(*other));
249            }
250        });
251    }
252    Ok(outs)
253}
254
255macro_rules! inconsistent_check {
256    ($self: ident.$name: ident, $cond: expr) => {
257        if $cond && $self.$name != $name {
258            return Err(DataFusionError::Plan(
259                concat!(
260                    "Inconsistent ",
261                    stringify!($name),
262                    " given in Range Function Rewrite"
263                )
264                .into(),
265            ));
266        } else {
267            $self.$name = $name;
268        }
269    };
270}
271
272impl TreeNodeRewriter for RangeExprRewriter<'_> {
273    type Node = Expr;
274
275    fn f_down(&mut self, node: Expr) -> DFResult<Transformed<Expr>> {
276        if let Expr::ScalarFunction(func) = &node
277            && func.name() == "range_fn"
278        {
279            // `range_fn(func, range, fill, byc, [byv], align, to)`
280            // `[byv]` are variadic arguments, byc indicate the length of arguments
281            let range_expr = self.get_range_expr(&func.args, 0)?;
282            let range = parse_duration_expr(&func.args, 1)?;
283            let byc = str::parse::<usize>(parse_str_expr(&func.args, 3)?)
284                .map_err(|e| DataFusionError::Plan(e.to_string()))?;
285            let by = parse_expr_list(&func.args, 4, byc)?;
286            let align = parse_duration_expr(&func.args, byc + 4)?;
287            let align_to = parse_align_to(&func.args, byc + 5, Some(&self.query_ctx.timezone()))?;
288            let mut data_type = range_expr.get_type(self.input_plan.schema())?;
289            let mut need_cast = false;
290            let fill = Fill::try_from_str(parse_str_expr(&func.args, 2)?, &data_type)?;
291            if matches!(fill, Some(Fill::Linear)) && data_type.is_integer() {
292                data_type = DataType::Float64;
293                need_cast = true;
294            }
295            inconsistent_check!(self.by, !self.by.is_empty());
296            inconsistent_check!(self.align, self.align != Duration::default());
297            inconsistent_check!(self.align_to, self.align_to != 0);
298            let range_fn = RangeFn {
299                name: if let Some(fill) = &fill {
300                    format!(
301                        "{} RANGE {} FILL {}",
302                        range_expr.schema_name(),
303                        parse_expr_to_string(&func.args, 1)?,
304                        fill
305                    )
306                } else {
307                    format!(
308                        "{} RANGE {}",
309                        range_expr.schema_name(),
310                        parse_expr_to_string(&func.args, 1)?,
311                    )
312                },
313                data_type,
314                expr: range_expr,
315                range,
316                fill,
317                need_cast,
318            };
319            let alias = Expr::Column(Column::from_name(range_fn.name.clone()));
320            self.range_fn.insert(range_fn);
321            return Ok(Transformed::yes(alias));
322        }
323        Ok(Transformed::no(node))
324    }
325}
326
327/// In order to implement RangeSelect query like `avg(field_0) RANGE '5m' FILL NULL`,
328/// All RangeSelect query items are converted into udf scalar function in sql parse stage, with format like `range_fn(avg(field_0), .....)`.
329/// `range_fn` contains all the parameters we need to execute RangeSelect.
330/// In order to correctly execute the query process of range select, we need to modify the query plan generated by datafusion.
331/// We need to recursively find the entire LogicalPlan, and find all `range_fn` scalar udf contained in the project plan,
332/// collecting info we need to generate RangeSelect Query LogicalPlan and rewrite th original LogicalPlan.
333pub struct RangePlanRewriter {
334    table_provider: DfTableSourceProvider,
335    query_ctx: QueryContextRef,
336}
337
338impl RangePlanRewriter {
339    pub fn new(table_provider: DfTableSourceProvider, query_ctx: QueryContextRef) -> Self {
340        Self {
341            table_provider,
342            query_ctx,
343        }
344    }
345
346    pub async fn rewrite(&mut self, plan: LogicalPlan) -> Result<LogicalPlan> {
347        match self.rewrite_logical_plan(&plan).await? {
348            Some(new_plan) => Ok(new_plan),
349            None => Ok(plan),
350        }
351    }
352
353    #[async_recursion]
354    async fn rewrite_logical_plan(&mut self, plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
355        let inputs = plan.inputs();
356        let mut new_inputs = Vec::with_capacity(inputs.len());
357        for input in &inputs {
358            new_inputs.push(self.rewrite_logical_plan(input).await?)
359        }
360        match plan {
361            LogicalPlan::Projection(Projection { expr, input, .. })
362                if have_range_in_exprs(expr) =>
363            {
364                let (aggr_plan, input) = if let LogicalPlan::Aggregate(aggr) = input.as_ref() {
365                    // Expr like `rate(max(a) RANGE '6m') RANGE '6m'` have legal syntax but illegal semantic.
366                    if have_range_in_exprs(&aggr.aggr_expr) {
367                        return RangeQuerySnafu {
368                            msg: "Nest Range Query is not allowed",
369                        }
370                        .fail();
371                    }
372                    (aggr, aggr.input.clone())
373                } else {
374                    return RangeQuerySnafu {
375                        msg: "Window functions is not allowed in Range Query",
376                    }
377                    .fail();
378                };
379                let (time_index, default_by) = self.get_index_by(input.schema()).await?;
380                let mut range_rewriter = RangeExprRewriter {
381                    input_plan: &input,
382                    align: Duration::default(),
383                    align_to: 0,
384                    by: vec![],
385                    range_fn: BTreeSet::new(),
386                    sub_aggr: aggr_plan,
387                    query_ctx: &self.query_ctx,
388                };
389                let new_expr = expr
390                    .iter()
391                    .map(|expr| expr.clone().rewrite(&mut range_rewriter).map(|x| x.data))
392                    .collect::<DFResult<Vec<_>>>()?;
393                if range_rewriter.by.is_empty() {
394                    range_rewriter.by = default_by;
395                }
396                let range_select = RangeSelect::try_new(
397                    input.clone(),
398                    range_rewriter.range_fn.into_iter().collect(),
399                    range_rewriter.align,
400                    range_rewriter.align_to,
401                    time_index,
402                    range_rewriter.by,
403                    &new_expr,
404                )?;
405                let no_additional_project = range_select.schema_project.is_some();
406                let range_plan = LogicalPlan::Extension(Extension {
407                    node: Arc::new(range_select),
408                });
409                if no_additional_project {
410                    Ok(Some(range_plan))
411                } else {
412                    let project_plan = LogicalPlanBuilder::from(range_plan)
413                        .project(new_expr)
414                        .and_then(|x| x.build())?;
415                    Ok(Some(project_plan))
416                }
417            }
418            _ => {
419                if new_inputs.iter().any(|x| x.is_some()) {
420                    let inputs: Vec<LogicalPlan> = new_inputs
421                        .into_iter()
422                        .zip(inputs)
423                        .map(|(x, y)| match x {
424                            Some(plan) => plan,
425                            None => y.clone(),
426                        })
427                        .collect();
428                    // Due to the limitations of Datafusion, for `LogicalPlan::Analyze` and `LogicalPlan::Explain`,
429                    // directly using the method `with_new_inputs` to rebuild a new `LogicalPlan` will cause an error,
430                    // so here we directly use the `LogicalPlanBuilder` to build a new plan.
431                    let plan = match plan {
432                        LogicalPlan::Analyze(Analyze { verbose, .. }) => {
433                            ensure!(
434                                inputs.len() == 1,
435                                RangeQuerySnafu {
436                                    msg: "Illegal subplan nums when rewrite Analyze logical plan",
437                                }
438                            );
439                            LogicalPlanBuilder::from(inputs[0].clone())
440                                .explain(*verbose, true)?
441                                .build()
442                        }
443                        LogicalPlan::Explain(Explain { verbose, .. }) => {
444                            ensure!(
445                                inputs.len() == 1,
446                                RangeQuerySnafu {
447                                    msg: "Illegal subplan nums when rewrite Explain logical plan",
448                                }
449                            );
450                            LogicalPlanBuilder::from(inputs[0].clone())
451                                .explain(*verbose, false)?
452                                .build()
453                        }
454                        LogicalPlan::Distinct(Distinct::On(DistinctOn {
455                            on_expr,
456                            select_expr,
457                            sort_expr,
458                            ..
459                        })) => {
460                            ensure!(
461                                inputs.len() == 1,
462                                RangeQuerySnafu {
463                                    msg: "Illegal subplan nums when rewrite DistinctOn logical plan",
464                                }
465                            );
466                            LogicalPlanBuilder::from(inputs[0].clone())
467                                .distinct_on(
468                                    on_expr.clone(),
469                                    select_expr.clone(),
470                                    sort_expr.clone(),
471                                )?
472                                .build()
473                        }
474                        _ => plan.with_new_exprs(plan.expressions_consider_join(), inputs),
475                    }?;
476                    Ok(Some(plan))
477                } else {
478                    Ok(None)
479                }
480            }
481        }
482    }
483
484    /// this function use to find the time_index column and row columns from input schema,
485    /// return `(time_index, [row_columns])` to the rewriter.
486    /// If the user does not explicitly use the `by` keyword to indicate time series,
487    /// `[row_columns]` will be use as default time series
488    async fn get_index_by(&mut self, schema: &Arc<DFSchema>) -> Result<(Expr, Vec<Expr>)> {
489        #[allow(deprecated)]
490        let mut time_index_expr = Expr::Wildcard {
491            qualifier: None,
492            options: Box::new(WildcardOptions::default()),
493        };
494        let mut default_by = vec![];
495        for i in 0..schema.fields().len() {
496            let (qualifier, _) = schema.qualified_field(i);
497            if let Some(table_ref) = qualifier {
498                let table = self
499                    .table_provider
500                    .resolve_table(table_ref.clone())
501                    .await
502                    .context(CatalogSnafu)?
503                    .as_any()
504                    .downcast_ref::<DefaultTableSource>()
505                    .context(UnknownTableSnafu)?
506                    .table_provider
507                    .as_any()
508                    .downcast_ref::<DfTableProviderAdapter>()
509                    .context(UnknownTableSnafu)?
510                    .table();
511                let schema = table.schema();
512                let time_index_column =
513                    schema
514                        .timestamp_column()
515                        .with_context(|| TimeIndexNotFoundSnafu {
516                            table: table_ref.to_string(),
517                        })?;
518                // assert time_index's datatype is timestamp
519                if let ConcreteDataType::Timestamp(_) = time_index_column.data_type {
520                    default_by = table
521                        .table_info()
522                        .meta
523                        .row_key_column_names()
524                        .map(|key| Expr::Column(Column::new(Some(table_ref.clone()), key)))
525                        .collect();
526                    // If the user does not specify a primary key when creating a table,
527                    // then by default all data will be aggregated into one time series,
528                    // which is equivalent to using `by(1)` in SQL
529                    if default_by.is_empty() {
530                        default_by = vec![1.lit()];
531                    }
532                    time_index_expr = Expr::Column(Column::new(
533                        Some(table_ref.clone()),
534                        time_index_column.name.clone(),
535                    ));
536                }
537            }
538        }
539        #[allow(deprecated)]
540        if matches!(time_index_expr, Expr::Wildcard { .. }) {
541            TimeIndexNotFoundSnafu {
542                table: schema.to_string(),
543            }
544            .fail()
545        } else {
546            Ok((time_index_expr, default_by))
547        }
548    }
549}
550
551fn have_range_in_exprs(exprs: &[Expr]) -> bool {
552    exprs.iter().any(|expr| {
553        let mut find_range = false;
554        let _ = expr.apply(|expr| {
555            Ok(match expr {
556                Expr::ScalarFunction(func) if func.name() == "range_fn" => {
557                    find_range = true;
558                    TreeNodeRecursion::Stop
559                }
560                _ => TreeNodeRecursion::Continue,
561            })
562        });
563        find_range
564    })
565}
566
567fn interval_only_in_expr(expr: &Expr) -> bool {
568    let mut all_interval = true;
569    let _ = expr.apply(|expr| {
570        // A cast expression for an interval.
571        if matches!(
572            expr,
573            Expr::Cast(Cast{
574                expr,
575                data_type: DataType::Interval(_)
576            }) if matches!(&**expr, Expr::Literal(ScalarValue::Utf8(_), _))
577        ) {
578            // Stop checking the sub `expr`,
579            // which is a `Utf8` type and has already been tested above.
580            return Ok(TreeNodeRecursion::Stop);
581        }
582
583        if !matches!(
584            expr,
585            Expr::Literal(ScalarValue::IntervalDayTime(_), _)
586                | Expr::Literal(ScalarValue::IntervalMonthDayNano(_), _)
587                | Expr::Literal(ScalarValue::IntervalYearMonth(_), _)
588                | Expr::BinaryExpr(_)
589                | Expr::Cast(Cast {
590                    data_type: DataType::Interval(_),
591                    ..
592                })
593        ) {
594            all_interval = false;
595            Ok(TreeNodeRecursion::Stop)
596        } else {
597            Ok(TreeNodeRecursion::Continue)
598        }
599    });
600
601    all_interval
602}
603
604#[cfg(test)]
605mod test {
606
607    use arrow::datatypes::IntervalUnit;
608    use catalog::RegisterTableRequest;
609    use catalog::memory::MemoryCatalogManager;
610    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
611    use common_time::IntervalYearMonth;
612    use datafusion_expr::{BinaryExpr, Literal, Operator};
613    use datatypes::prelude::ConcreteDataType;
614    use datatypes::schema::{ColumnSchema, Schema};
615    use session::context::QueryContext;
616    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
617    use table::test_util::EmptyTable;
618
619    use super::*;
620    use crate::options::QueryOptions;
621    use crate::parser::QueryLanguageParser;
622    use crate::{QueryEngineFactory, QueryEngineRef};
623
624    async fn create_test_engine() -> QueryEngineRef {
625        let table_name = "test".to_string();
626        let mut columns = vec![];
627        for i in 0..5 {
628            columns.push(ColumnSchema::new(
629                format!("tag_{i}"),
630                ConcreteDataType::string_datatype(),
631                false,
632            ));
633        }
634        columns.push(
635            ColumnSchema::new(
636                "timestamp".to_string(),
637                ConcreteDataType::timestamp_millisecond_datatype(),
638                false,
639            )
640            .with_time_index(true),
641        );
642        for i in 0..5 {
643            columns.push(ColumnSchema::new(
644                format!("field_{i}"),
645                ConcreteDataType::float64_datatype(),
646                true,
647            ));
648        }
649        let schema = Arc::new(Schema::new(columns));
650        let table_meta = TableMetaBuilder::empty()
651            .schema(schema)
652            .primary_key_indices((0..5).collect())
653            .value_indices((6..11).collect())
654            .next_column_id(1024)
655            .build()
656            .unwrap();
657        let table_info = TableInfoBuilder::default()
658            .name(&table_name)
659            .meta(table_meta)
660            .build()
661            .unwrap();
662        let table = EmptyTable::from_table_info(&table_info);
663        let catalog_list = MemoryCatalogManager::with_default_setup();
664        assert!(
665            catalog_list
666                .register_table_sync(RegisterTableRequest {
667                    catalog: DEFAULT_CATALOG_NAME.to_string(),
668                    schema: DEFAULT_SCHEMA_NAME.to_string(),
669                    table_name,
670                    table_id: 1024,
671                    table,
672                })
673                .is_ok()
674        );
675        QueryEngineFactory::new(
676            catalog_list,
677            None,
678            None,
679            None,
680            None,
681            false,
682            QueryOptions::default(),
683        )
684        .query_engine()
685    }
686
687    async fn do_query(sql: &str) -> Result<LogicalPlan> {
688        let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
689        let engine = create_test_engine().await;
690        engine.planner().plan(&stmt, QueryContext::arc()).await
691    }
692
693    async fn query_plan_compare(sql: &str, expected: String) {
694        let plan = do_query(sql).await.unwrap();
695        assert_eq!(plan.display_indent_schema().to_string(), expected);
696    }
697
698    #[tokio::test]
699    async fn range_no_project() {
700        let query = r#"SELECT timestamp, tag_0, tag_1, avg(field_0 + field_1) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
701        let expected = String::from(
702            "RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8, avg(test.field_0 + test.field_1) RANGE 5m:Float64;N]\
703            \n  TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
704        );
705        query_plan_compare(query, expected).await;
706    }
707
708    #[tokio::test]
709    async fn range_expr_calculation() {
710        let query = r#"SELECT (avg(field_0 + field_1)/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
711        let expected = String::from(
712            "Projection: avg(test.field_0 + test.field_1) RANGE 5m / Int64(4) [avg(test.field_0 + test.field_1) RANGE 5m / Int64(4):Float64;N]\
713            \n  RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
714            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
715        );
716        query_plan_compare(query, expected).await;
717    }
718
719    #[tokio::test]
720    async fn range_multi_args() {
721        let query =
722            r#"SELECT (covar(field_0 + field_1, field_1)/4) RANGE '5m' FROM test ALIGN '1h';"#;
723        let expected = String::from(
724            "Projection: covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4) [covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4):Float64;N]\
725            \n  RangeSelect: range_exprs=[covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1, test.tag_2, test.tag_3, test.tag_4], time_index=timestamp [covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8]\
726            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
727        );
728        query_plan_compare(query, expected).await;
729    }
730
731    #[tokio::test]
732    async fn range_calculation() {
733        let query = r#"SELECT ((avg(field_0)+sum(field_1))/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL;"#;
734        let expected = String::from(
735            "Projection: (avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL) / Int64(4) [avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL / Int64(4):Float64;N]\
736            \n  RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL NULL, sum(test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL NULL:Float64;N, sum(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
737            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
738        );
739        query_plan_compare(query, expected).await;
740    }
741
742    #[tokio::test]
743    async fn range_as_sub_query() {
744        let query = r#"SELECT foo + 1 from (SELECT ((avg(field_0)+sum(field_1))/4) RANGE '5m' as foo FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL) where foo > 1;"#;
745        let expected = String::from(
746            "Projection: foo + Int64(1) [foo + Int64(1):Float64;N]\
747            \n  Filter: foo > Int64(1) [foo:Float64;N]\
748            \n    Projection: (avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL) / Int64(4) AS foo [foo:Float64;N]\
749            \n      RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL NULL, sum(test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL NULL:Float64;N, sum(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
750            \n        TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
751        );
752        query_plan_compare(query, expected).await;
753    }
754
755    #[tokio::test]
756    async fn range_from_nest_query() {
757        let query = r#"SELECT ((avg(a)+sum(b))/4) RANGE '5m' FROM (SELECT field_0 as a, field_1 as b, tag_0 as c, tag_1 as d, timestamp from test where field_0 > 1.0) ALIGN '1h' by (c, d) FILL NULL;"#;
758        let expected = String::from(
759            "Projection: (avg(a) RANGE 5m FILL NULL + sum(b) RANGE 5m FILL NULL) / Int64(4) [avg(a) RANGE 5m FILL NULL + sum(b) RANGE 5m FILL NULL / Int64(4):Float64;N]\
760            \n  RangeSelect: range_exprs=[avg(a) RANGE 5m FILL NULL, sum(b) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[c, d], time_index=timestamp [avg(a) RANGE 5m FILL NULL:Float64;N, sum(b) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(ms), c:Utf8, d:Utf8]\
761            \n    Projection: test.field_0 AS a, test.field_1 AS b, test.tag_0 AS c, test.tag_1 AS d, test.timestamp [a:Float64;N, b:Float64;N, c:Utf8, d:Utf8, timestamp:Timestamp(ms)]\
762            \n      Filter: test.field_0 > Float64(1) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]\
763            \n        TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
764        );
765        query_plan_compare(query, expected).await;
766    }
767
768    #[tokio::test]
769    async fn range_in_expr() {
770        let query = r#"SELECT sin(avg(field_0 + field_1) RANGE '5m' + 1) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
771        let expected = String::from(
772            "Projection: sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1)) [sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1)):Float64;N]\
773            \n  RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
774            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
775        );
776        query_plan_compare(query, expected).await;
777    }
778
779    #[tokio::test]
780    async fn duplicate_range_expr() {
781        let query = r#"SELECT avg(field_0) RANGE '5m' FILL 6.0 + avg(field_0) RANGE '5m' FILL 6.0 FROM test ALIGN '1h' by (tag_0,tag_1);"#;
782        let expected = String::from(
783            "Projection: avg(test.field_0) RANGE 5m FILL 6 + avg(test.field_0) RANGE 5m FILL 6 [avg(test.field_0) RANGE 5m FILL 6 + avg(test.field_0) RANGE 5m FILL 6:Float64]\
784            \n  RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL 6], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL 6:Float64, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
785            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
786        );
787        query_plan_compare(query, expected).await;
788    }
789
790    #[tokio::test]
791    async fn deep_nest_range_expr() {
792        let query = r#"SELECT round(sin(avg(field_0 + field_1) RANGE '5m' + 1)) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
793        let expected = String::from(
794            "Projection: round(sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1))) [round(sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1))):Float64;N]\
795            \n  RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
796            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
797        );
798        query_plan_compare(query, expected).await;
799    }
800
801    #[tokio::test]
802    async fn complex_range_expr() {
803        let query = r#"SELECT gcd(CAST(max(field_0 + 1) Range '5m' FILL NULL AS Int64), CAST(tag_0 AS Int64)) + round(max(field_2+1) Range '6m' FILL NULL + 1) + max(field_2+3) Range '10m' FILL NULL * CAST(tag_1 AS Float64) + 1 FROM test ALIGN '1h' by (tag_0, tag_1);"#;
804        let expected = String::from(
805            "Projection: gcd(arrow_cast(max(test.field_0 + Int64(1)) RANGE 5m FILL NULL, Utf8(\"Int64\")), arrow_cast(test.tag_0, Utf8(\"Int64\"))) + round(max(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + max(test.field_2 + Int64(3)) RANGE 10m FILL NULL * arrow_cast(test.tag_1, Utf8(\"Float64\")) + Int64(1) [gcd(arrow_cast(max(test.field_0 + Int64(1)) RANGE 5m FILL NULL,Utf8(\"Int64\")),arrow_cast(test.tag_0,Utf8(\"Int64\"))) + round(max(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + max(test.field_2 + Int64(3)) RANGE 10m FILL NULL * arrow_cast(test.tag_1,Utf8(\"Float64\")) + Int64(1):Float64;N]\
806            \n  RangeSelect: range_exprs=[max(test.field_0 + Int64(1)) RANGE 5m FILL NULL, max(test.field_2 + Int64(1)) RANGE 6m FILL NULL, max(test.field_2 + Int64(3)) RANGE 10m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [max(test.field_0 + Int64(1)) RANGE 5m FILL NULL:Float64;N, max(test.field_2 + Int64(1)) RANGE 6m FILL NULL:Float64;N, max(test.field_2 + Int64(3)) RANGE 10m FILL NULL:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
807            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
808        );
809        query_plan_compare(query, expected).await;
810    }
811
812    #[tokio::test]
813    async fn range_linear_on_integer() {
814        let query = r#"SELECT min(CAST(field_0 AS Int64) + CAST(field_1 AS Int64)) RANGE '5m' FILL LINEAR FROM test ALIGN '1h' by (tag_0,tag_1);"#;
815        let expected = String::from(
816            "RangeSelect: range_exprs=[min(arrow_cast(test.field_0,Utf8(\"Int64\")) + arrow_cast(test.field_1,Utf8(\"Int64\"))) RANGE 5m FILL LINEAR], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [min(arrow_cast(test.field_0,Utf8(\"Int64\")) + arrow_cast(test.field_1,Utf8(\"Int64\"))) RANGE 5m FILL LINEAR:Float64;N]\
817            \n  TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
818        );
819        query_plan_compare(query, expected).await;
820    }
821
822    #[tokio::test]
823    async fn range_nest_range_err() {
824        let query = r#"SELECT sum(avg(field_0 + field_1) RANGE '5m' + 1) RANGE '5m' + 1 FROM test ALIGN '1h' by (tag_0,tag_1);"#;
825        assert_eq!(
826            do_query(query).await.unwrap_err().to_string(),
827            "Range Query: Nest Range Query is not allowed"
828        )
829    }
830
831    #[tokio::test]
832    /// Start directly from the rewritten SQL and check whether the error reported by the range expression rewriting is as expected.
833    /// the right argument is `range_fn(avg(field_0), '5m', 'NULL', '0', '1h')`
834    async fn range_argument_err_1() {
835        let query = r#"SELECT range_fn('5m', avg(field_0), 'NULL', '1', tag_0, '1h') FROM test group by tag_0;"#;
836        let error = do_query(query).await.unwrap_err().to_string();
837        assert_eq!(
838            error,
839            "Error during planning: Illegal argument `Utf8(\"5m\")` in range select query"
840        )
841    }
842
843    #[tokio::test]
844    async fn range_argument_err_2() {
845        let query = r#"SELECT range_fn(avg(field_0), 5, 'NULL', '1', tag_0, '1h') FROM test group by tag_0;"#;
846        let error = do_query(query).await.unwrap_err().to_string();
847        assert_eq!(
848            error,
849            "Error during planning: Illegal argument `Int64(5)` in range select query"
850        )
851    }
852
853    #[test]
854    fn test_parse_duration_expr() {
855        // test IntervalYearMonth
856        let interval = IntervalYearMonth::new(10);
857        let args = vec![ScalarValue::IntervalYearMonth(Some(interval.to_i32())).lit()];
858        assert!(parse_duration_expr(&args, 0).is_err(),);
859        // test IntervalDayTime
860        let interval = IntervalDayTime::new(10, 10);
861        let args = vec![ScalarValue::IntervalDayTime(Some(interval.into())).lit()];
862        assert_eq!(
863            parse_duration_expr(&args, 0).unwrap().as_millis() as i64,
864            interval.as_millis()
865        );
866        // test IntervalMonthDayNano
867        let interval = IntervalMonthDayNano::new(0, 10, 10);
868        let args = vec![ScalarValue::IntervalMonthDayNano(Some(interval.into())).lit()];
869        assert_eq!(
870            parse_duration_expr(&args, 0).unwrap().as_millis() as i64,
871            interval.days as i64 * MS_PER_DAY + interval.nanoseconds / NANOS_PER_MILLI,
872        );
873        // test Duration
874        let args = vec!["1y4w".lit()];
875        assert_eq!(
876            parse_duration_expr(&args, 0).unwrap(),
877            parse_duration("1y4w").unwrap()
878        );
879        // test cast expression
880        let args = vec![Expr::Cast(Cast {
881            expr: Box::new("15 minutes".lit()),
882            data_type: DataType::Interval(IntervalUnit::MonthDayNano),
883        })];
884        assert_eq!(
885            parse_duration_expr(&args, 0).unwrap(),
886            parse_duration("15m").unwrap()
887        );
888        // test index err
889        assert!(parse_duration_expr(&args, 10).is_err());
890        // test evaluate expr
891        let args = vec![Expr::BinaryExpr(BinaryExpr {
892            left: Box::new(
893                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
894            ),
895            op: Operator::Plus,
896            right: Box::new(
897                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
898            ),
899        })];
900        assert_eq!(
901            parse_duration_expr(&args, 0).unwrap(),
902            Duration::from_millis(20)
903        );
904        let args = vec![Expr::BinaryExpr(BinaryExpr {
905            left: Box::new(
906                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
907            ),
908            op: Operator::Minus,
909            right: Box::new(
910                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
911            ),
912        })];
913        // test zero interval error
914        assert!(parse_duration_expr(&args, 0).is_err());
915        // test must all be interval
916        let args = vec![Expr::BinaryExpr(BinaryExpr {
917            left: Box::new(
918                ScalarValue::IntervalYearMonth(Some(IntervalYearMonth::new(10).to_i32())).lit(),
919            ),
920            op: Operator::Minus,
921            right: Box::new(ScalarValue::Time64Microsecond(Some(0)).lit()),
922        })];
923        assert!(parse_duration_expr(&args, 0).is_err());
924    }
925
926    #[test]
927    fn test_parse_align_to() {
928        // test NOW
929        let args = vec!["NOW".lit()];
930        let epsinon = parse_align_to(&args, 0, None).unwrap() - Timestamp::current_millis().value();
931        assert!(epsinon.abs() < 100);
932        // test default
933        let args = vec!["".lit()];
934        assert_eq!(0, parse_align_to(&args, 0, None).unwrap());
935        // test default with timezone
936        let args = vec!["".lit()];
937        assert_eq!(
938            -36000 * 1000,
939            parse_align_to(&args, 0, Some(&Timezone::from_tz_string("HST").unwrap())).unwrap()
940        );
941        assert_eq!(
942            28800 * 1000,
943            parse_align_to(
944                &args,
945                0,
946                Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
947            )
948            .unwrap()
949        );
950
951        // test Timestamp
952        let args = vec!["1970-01-01T00:00:00+08:00".lit()];
953        assert_eq!(parse_align_to(&args, 0, None).unwrap(), -8 * 60 * 60 * 1000);
954        // timezone
955        let args = vec!["1970-01-01T00:00:00".lit()];
956        assert_eq!(
957            parse_align_to(
958                &args,
959                0,
960                Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
961            )
962            .unwrap(),
963            -8 * 60 * 60 * 1000
964        );
965        // test evaluate expr
966        let args = vec![Expr::BinaryExpr(BinaryExpr {
967            left: Box::new(
968                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
969            ),
970            op: Operator::Plus,
971            right: Box::new(
972                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
973            ),
974        })];
975        assert_eq!(parse_align_to(&args, 0, None).unwrap(), 20);
976    }
977
978    #[test]
979    fn test_interval_only() {
980        let expr = Expr::BinaryExpr(BinaryExpr {
981            left: Box::new(ScalarValue::DurationMillisecond(Some(20)).lit()),
982            op: Operator::Minus,
983            right: Box::new(
984                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
985            ),
986        });
987        assert!(!interval_only_in_expr(&expr));
988        let expr = Expr::BinaryExpr(BinaryExpr {
989            left: Box::new(
990                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
991            ),
992            op: Operator::Minus,
993            right: Box::new(
994                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
995            ),
996        });
997        assert!(interval_only_in_expr(&expr));
998
999        let expr = Expr::BinaryExpr(BinaryExpr {
1000            left: Box::new(Expr::Cast(Cast {
1001                expr: Box::new("15 minute".lit()),
1002                data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1003            })),
1004            op: Operator::Minus,
1005            right: Box::new(
1006                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
1007            ),
1008        });
1009        assert!(interval_only_in_expr(&expr));
1010
1011        let expr = Expr::Cast(Cast {
1012            expr: Box::new(Expr::BinaryExpr(BinaryExpr {
1013                left: Box::new(Expr::Cast(Cast {
1014                    expr: Box::new("15 minute".lit()),
1015                    data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1016                })),
1017                op: Operator::Minus,
1018                right: Box::new(
1019                    ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
1020                ),
1021            })),
1022            data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1023        });
1024
1025        assert!(interval_only_in_expr(&expr));
1026    }
1027}