Skip to main content

query/range_select/
plan_rewrite.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeSet;
16use std::sync::Arc;
17use std::time::Duration;
18
19use arrow_schema::DataType;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_time::interval::{MS_PER_DAY, NANOS_PER_MILLI};
23use common_time::timestamp::TimeUnit;
24use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp, Timezone};
25use datafusion::datasource::DefaultTableSource;
26use datafusion::prelude::Column;
27use datafusion::scalar::ScalarValue;
28use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
29use datafusion_common::{DFSchema, DataFusionError, Result as DFResult};
30use datafusion_expr::expr::WildcardOptions;
31use datafusion_expr::simplify::SimplifyContext;
32use datafusion_expr::{
33    Aggregate, Analyze, Cast, Distinct, DistinctOn, Explain, Expr, ExprSchemable, Extension,
34    Literal, LogicalPlan, LogicalPlanBuilder, Projection,
35};
36use datafusion_optimizer::simplify_expressions::ExprSimplifier;
37use datatypes::prelude::ConcreteDataType;
38use datatypes::schema::TIME_INDEX_KEY;
39use promql_parser::util::parse_duration;
40use session::context::QueryContextRef;
41use snafu::{OptionExt, ResultExt, ensure};
42use table::table::adapter::DfTableProviderAdapter;
43
44use crate::error::{
45    CatalogSnafu, RangeQuerySnafu, Result, TimeIndexNotFoundSnafu, UnknownTableSnafu,
46};
47use crate::plan::ExtractExpr;
48use crate::range_select::plan::{Fill, RangeFn, RangeSelect};
49
50/// `RangeExprRewriter` will recursively search certain `Expr`, find all `range_fn` scalar udf contained in `Expr`,
51/// and collect the information required by the RangeSelect query,
52/// and finally modify the `range_fn` scalar udf to an ordinary column field.
53pub struct RangeExprRewriter<'a> {
54    input_plan: &'a Arc<LogicalPlan>,
55    align: Duration,
56    align_to: i64,
57    by: Vec<Expr>,
58    /// Use `BTreeSet` to avoid in case like `avg(a) RANGE '5m' + avg(a) RANGE '5m'`, duplicate range expr `avg(a) RANGE '5m'` be calculate twice
59    range_fn: BTreeSet<RangeFn>,
60    sub_aggr: &'a Aggregate,
61    query_ctx: &'a QueryContextRef,
62}
63
64impl RangeExprRewriter<'_> {
65    pub fn get_range_expr(&self, args: &[Expr], i: usize) -> DFResult<Expr> {
66        match args.get(i) {
67            Some(Expr::Column(column)) => {
68                let index = self.sub_aggr.schema.index_of_column(column)?;
69                let len = self.sub_aggr.group_expr.len();
70                self.sub_aggr
71                    .aggr_expr
72                    .get(index - len)
73                    .cloned()
74                    .ok_or(DataFusionError::Plan(
75                        "Range expr not found in underlying Aggregate Plan".into(),
76                    ))
77            }
78            Some(Expr::Alias(alias)) => {
79                self.get_range_expr(std::slice::from_ref(alias.expr.as_ref()), 0)
80            }
81            other => Err(dispose_parse_error(other)),
82        }
83    }
84}
85
86#[inline]
87fn dispose_parse_error(expr: Option<&Expr>) -> DataFusionError {
88    DataFusionError::Plan(
89        expr.map(|x| {
90            format!(
91                "Illegal argument `{}` in range select query",
92                x.schema_name()
93            )
94        })
95        .unwrap_or("Missing argument in range select query".into()),
96    )
97}
98
99fn parse_str_expr(args: &[Expr], i: usize) -> DFResult<&str> {
100    match args.get(i) {
101        Some(Expr::Literal(ScalarValue::Utf8(Some(str)), _)) => Ok(str.as_str()),
102        other => Err(dispose_parse_error(other)),
103    }
104}
105
106fn parse_expr_to_string(args: &[Expr], i: usize) -> DFResult<String> {
107    match args.get(i) {
108        Some(Expr::Literal(ScalarValue::Utf8(Some(str)), _)) => Ok(str.clone()),
109        Some(expr) => Ok(expr.schema_name().to_string()),
110        None => Err(dispose_parse_error(None)),
111    }
112}
113
114/// Parse a duraion expr:
115/// 1. duration string (e.g. `'1h'`)
116/// 2. Interval expr (e.g. `INTERVAL '1 year 3 hours 20 minutes'`)
117/// 3. An interval expr can be evaluated at the logical plan stage (e.g. `INTERVAL '2' day - INTERVAL '1' day`)
118fn parse_duration_expr(args: &[Expr], i: usize) -> DFResult<Duration> {
119    match args.get(i) {
120        Some(Expr::Literal(ScalarValue::Utf8(Some(str)), _)) => {
121            parse_duration(str).map_err(DataFusionError::Plan)
122        }
123        Some(expr) => {
124            let ms = evaluate_expr_to_millisecond(args, i, true)?;
125            if ms <= 0 {
126                return Err(dispose_parse_error(Some(expr)));
127            }
128            Ok(Duration::from_millis(ms as u64))
129        }
130        None => Err(dispose_parse_error(None)),
131    }
132}
133
134/// Evaluate a time calculation expr, case like:
135/// 1. `INTERVAL '1' day + INTERVAL '1 year 2 hours 3 minutes'`
136/// 2. `now() - INTERVAL '1' day` (when `interval_only==false`)
137///
138/// Output a millisecond timestamp
139///
140/// if `interval_only==true`, only accept expr with all interval type (case 2 will return a error)
141fn evaluate_expr_to_millisecond(args: &[Expr], i: usize, interval_only: bool) -> DFResult<i64> {
142    let Some(expr) = args.get(i) else {
143        return Err(dispose_parse_error(None));
144    };
145    if interval_only && !interval_only_in_expr(expr) {
146        return Err(dispose_parse_error(Some(expr)));
147    }
148    let info = SimplifyContext::default().with_current_time();
149    let simplify_expr = ExprSimplifier::new(info).simplify(expr.clone())?;
150    match simplify_expr {
151        Expr::Literal(ScalarValue::TimestampNanosecond(ts_nanos, _), _)
152        | Expr::Literal(ScalarValue::DurationNanosecond(ts_nanos), _) => {
153            ts_nanos.map(|v| v / 1_000_000)
154        }
155        Expr::Literal(ScalarValue::TimestampMicrosecond(ts_micros, _), _)
156        | Expr::Literal(ScalarValue::DurationMicrosecond(ts_micros), _) => {
157            ts_micros.map(|v| v / 1_000)
158        }
159        Expr::Literal(ScalarValue::TimestampMillisecond(ts_millis, _), _)
160        | Expr::Literal(ScalarValue::DurationMillisecond(ts_millis), _) => ts_millis,
161        Expr::Literal(ScalarValue::TimestampSecond(ts_secs, _), _)
162        | Expr::Literal(ScalarValue::DurationSecond(ts_secs), _) => ts_secs.map(|v| v * 1_000),
163        // We don't support interval with months as days in a month is unclear.
164        Expr::Literal(ScalarValue::IntervalYearMonth(interval), _) => interval
165            .map(|v| {
166                let interval = IntervalYearMonth::from_i32(v);
167                if interval.months != 0 {
168                    return Err(DataFusionError::Plan(format!(
169                        "Year or month interval is not allowed in range query: {}",
170                        expr.schema_name()
171                    )));
172                }
173
174                Ok(0)
175            })
176            .transpose()?,
177        Expr::Literal(ScalarValue::IntervalDayTime(interval), _) => interval.map(|v| {
178            let interval = IntervalDayTime::from(v);
179            interval.as_millis()
180        }),
181        Expr::Literal(ScalarValue::IntervalMonthDayNano(interval), _) => interval
182            .map(|v| {
183                let interval = IntervalMonthDayNano::from(v);
184                if interval.months != 0 {
185                    return Err(DataFusionError::Plan(format!(
186                        "Year or month interval is not allowed in range query: {}",
187                        expr.schema_name()
188                    )));
189                }
190
191                Ok(interval.days as i64 * MS_PER_DAY + interval.nanoseconds / NANOS_PER_MILLI)
192            })
193            .transpose()?,
194        _ => None,
195    }
196    .ok_or_else(|| {
197        DataFusionError::Plan(format!(
198            "{} is not a expr can be evaluate and use in range query",
199            expr.schema_name()
200        ))
201    })
202}
203
204/// Parse the `align to` clause and return a UTC timestamp with unit of millisecond,
205/// which is used as the basis for dividing time slot during the align operation.
206/// 1. NOW: align to current execute time
207/// 2. Timestamp string: align to specific timestamp
208/// 3. An expr can be evaluated at the logical plan stage (e.g. `now() - INTERVAL '1' day`)
209/// 4. leave empty (as Default Option): align to unix epoch 0 (timezone aware)
210fn parse_align_to(args: &[Expr], i: usize, timezone: Option<&Timezone>) -> DFResult<i64> {
211    let Ok(s) = parse_str_expr(args, i) else {
212        return evaluate_expr_to_millisecond(args, i, false);
213    };
214    let upper = s.to_uppercase();
215    match upper.as_str() {
216        "NOW" => return Ok(Timestamp::current_millis().value()),
217        // default align to unix epoch 0 (timezone aware)
218        "" => return Ok(timezone.map(|tz| tz.local_minus_utc() * 1000).unwrap_or(0)),
219        _ => (),
220    }
221
222    Timestamp::from_str(s, timezone)
223        .map_err(|e| {
224            DataFusionError::Plan(format!(
225                "Illegal `align to` argument `{}` in range select query, can't be parse as NOW/CALENDAR/Timestamp, error: {}",
226                s, e
227            ))
228        })?.convert_to(TimeUnit::Millisecond).map(|x|x.value()).ok_or(DataFusionError::Plan(format!(
229            "Illegal `align to` argument `{}` in range select query, can't be convert to a valid Timestamp",
230            s
231        ))
232        )
233}
234
235fn parse_expr_list(args: &[Expr], start: usize, len: usize) -> DFResult<Vec<Expr>> {
236    let mut outs = Vec::with_capacity(len);
237    for i in start..start + len {
238        outs.push(match &args.get(i) {
239            Some(
240                Expr::Column(_)
241                | Expr::Literal(_, _)
242                | Expr::BinaryExpr(_)
243                | Expr::ScalarFunction(_),
244            ) => args[i].clone(),
245            Some(Expr::Alias(alias)) if matches!(*alias.expr, Expr::ScalarFunction(_)) => {
246                args[i].clone()
247            }
248            other => {
249                return Err(dispose_parse_error(*other));
250            }
251        });
252    }
253    Ok(outs)
254}
255
256macro_rules! inconsistent_check {
257    ($self: ident.$name: ident, $cond: expr) => {
258        if $cond && $self.$name != $name {
259            return Err(DataFusionError::Plan(
260                concat!(
261                    "Inconsistent ",
262                    stringify!($name),
263                    " given in Range Function Rewrite"
264                )
265                .into(),
266            ));
267        } else {
268            $self.$name = $name;
269        }
270    };
271}
272
273impl TreeNodeRewriter for RangeExprRewriter<'_> {
274    type Node = Expr;
275
276    fn f_down(&mut self, node: Expr) -> DFResult<Transformed<Expr>> {
277        if let Expr::ScalarFunction(func) = &node
278            && func.name() == "range_fn"
279        {
280            // `range_fn(func, range, fill, byc, [byv], align, to)`
281            // `[byv]` are variadic arguments, byc indicate the length of arguments
282            let range_expr = self.get_range_expr(&func.args, 0)?;
283            let range = parse_duration_expr(&func.args, 1)?;
284            let byc = str::parse::<usize>(parse_str_expr(&func.args, 3)?)
285                .map_err(|e| DataFusionError::Plan(e.to_string()))?;
286            let by = parse_expr_list(&func.args, 4, byc)?;
287            let align = parse_duration_expr(&func.args, byc + 4)?;
288            let align_to = parse_align_to(&func.args, byc + 5, Some(&self.query_ctx.timezone()))?;
289            let mut data_type = range_expr.get_type(self.input_plan.schema())?;
290            let mut need_cast = false;
291            let fill = Fill::try_from_str(parse_str_expr(&func.args, 2)?, &data_type)?;
292            if matches!(fill, Some(Fill::Linear)) && data_type.is_integer() {
293                data_type = DataType::Float64;
294                need_cast = true;
295            }
296            inconsistent_check!(self.by, !self.by.is_empty());
297            inconsistent_check!(self.align, self.align != Duration::default());
298            inconsistent_check!(self.align_to, self.align_to != 0);
299            let range_fn = RangeFn {
300                name: if let Some(fill) = &fill {
301                    format!(
302                        "{} RANGE {} FILL {}",
303                        range_expr.schema_name(),
304                        parse_expr_to_string(&func.args, 1)?,
305                        fill
306                    )
307                } else {
308                    format!(
309                        "{} RANGE {}",
310                        range_expr.schema_name(),
311                        parse_expr_to_string(&func.args, 1)?,
312                    )
313                },
314                data_type,
315                expr: range_expr,
316                range,
317                fill,
318                need_cast,
319            };
320            let alias = Expr::Column(Column::from_name(range_fn.name.clone()));
321            self.range_fn.insert(range_fn);
322            return Ok(Transformed::yes(alias));
323        }
324        Ok(Transformed::no(node))
325    }
326}
327
328/// In order to implement RangeSelect query like `avg(field_0) RANGE '5m' FILL NULL`,
329/// All RangeSelect query items are converted into udf scalar function in sql parse stage, with format like `range_fn(avg(field_0), .....)`.
330/// `range_fn` contains all the parameters we need to execute RangeSelect.
331/// In order to correctly execute the query process of range select, we need to modify the query plan generated by datafusion.
332/// We need to recursively find the entire LogicalPlan, and find all `range_fn` scalar udf contained in the project plan,
333/// collecting info we need to generate RangeSelect Query LogicalPlan and rewrite th original LogicalPlan.
334pub struct RangePlanRewriter {
335    table_provider: DfTableSourceProvider,
336    query_ctx: QueryContextRef,
337}
338
339impl RangePlanRewriter {
340    pub fn new(table_provider: DfTableSourceProvider, query_ctx: QueryContextRef) -> Self {
341        Self {
342            table_provider,
343            query_ctx,
344        }
345    }
346
347    pub async fn rewrite(&mut self, plan: LogicalPlan) -> Result<LogicalPlan> {
348        match self.rewrite_logical_plan(&plan).await? {
349            Some(new_plan) => Ok(new_plan),
350            None => Ok(plan),
351        }
352    }
353
354    #[async_recursion]
355    async fn rewrite_logical_plan(&mut self, plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
356        let inputs = plan.inputs();
357        let mut new_inputs = Vec::with_capacity(inputs.len());
358        for input in &inputs {
359            new_inputs.push(self.rewrite_logical_plan(input).await?)
360        }
361        match plan {
362            LogicalPlan::Projection(Projection { expr, input, .. })
363                if have_range_in_exprs(expr) =>
364            {
365                let (aggr_plan, input) = if let LogicalPlan::Aggregate(aggr) = input.as_ref() {
366                    // Expr like `rate(max(a) RANGE '6m') RANGE '6m'` have legal syntax but illegal semantic.
367                    if have_range_in_exprs(&aggr.aggr_expr) {
368                        return RangeQuerySnafu {
369                            msg: "Nest Range Query is not allowed",
370                        }
371                        .fail();
372                    }
373                    (aggr, aggr.input.clone())
374                } else {
375                    return RangeQuerySnafu {
376                        msg: "Window functions is not allowed in Range Query",
377                    }
378                    .fail();
379                };
380                let query_ctx = self.query_ctx.clone();
381                let mut range_rewriter = RangeExprRewriter {
382                    input_plan: &input,
383                    align: Duration::default(),
384                    align_to: 0,
385                    by: vec![],
386                    range_fn: BTreeSet::new(),
387                    sub_aggr: aggr_plan,
388                    query_ctx: &query_ctx,
389                };
390                let new_expr = expr
391                    .iter()
392                    .map(|expr| expr.clone().rewrite(&mut range_rewriter).map(|x| x.data))
393                    .collect::<DFResult<Vec<_>>>()?;
394                let need_default_by = range_rewriter.by.is_empty();
395                let (time_index, default_by) =
396                    self.get_index_by(input.schema(), need_default_by).await?;
397                if need_default_by {
398                    range_rewriter.by = default_by;
399                }
400                let range_select = RangeSelect::try_new(
401                    input.clone(),
402                    range_rewriter.range_fn.into_iter().collect(),
403                    range_rewriter.align,
404                    range_rewriter.align_to,
405                    time_index,
406                    range_rewriter.by,
407                    &new_expr,
408                )?;
409                let no_additional_project = range_select.schema_project.is_some();
410                let range_plan = LogicalPlan::Extension(Extension {
411                    node: Arc::new(range_select),
412                });
413                if no_additional_project {
414                    Ok(Some(range_plan))
415                } else {
416                    let project_plan = LogicalPlanBuilder::from(range_plan)
417                        .project(new_expr)
418                        .and_then(|x| x.build())?;
419                    Ok(Some(project_plan))
420                }
421            }
422            _ => {
423                if new_inputs.iter().any(|x| x.is_some()) {
424                    let inputs: Vec<LogicalPlan> = new_inputs
425                        .into_iter()
426                        .zip(inputs)
427                        .map(|(x, y)| match x {
428                            Some(plan) => plan,
429                            None => y.clone(),
430                        })
431                        .collect();
432                    // Due to the limitations of Datafusion, for `LogicalPlan::Analyze` and `LogicalPlan::Explain`,
433                    // directly using the method `with_new_inputs` to rebuild a new `LogicalPlan` will cause an error,
434                    // so here we directly use the `LogicalPlanBuilder` to build a new plan.
435                    let plan = match plan {
436                        LogicalPlan::Analyze(Analyze { verbose, .. }) => {
437                            ensure!(
438                                inputs.len() == 1,
439                                RangeQuerySnafu {
440                                    msg: "Illegal subplan nums when rewrite Analyze logical plan",
441                                }
442                            );
443                            LogicalPlanBuilder::from(inputs[0].clone())
444                                .explain(*verbose, true)?
445                                .build()
446                        }
447                        LogicalPlan::Explain(Explain { verbose, .. }) => {
448                            ensure!(
449                                inputs.len() == 1,
450                                RangeQuerySnafu {
451                                    msg: "Illegal subplan nums when rewrite Explain logical plan",
452                                }
453                            );
454                            LogicalPlanBuilder::from(inputs[0].clone())
455                                .explain(*verbose, false)?
456                                .build()
457                        }
458                        LogicalPlan::Distinct(Distinct::On(DistinctOn {
459                            on_expr,
460                            select_expr,
461                            sort_expr,
462                            ..
463                        })) => {
464                            ensure!(
465                                inputs.len() == 1,
466                                RangeQuerySnafu {
467                                    msg: "Illegal subplan nums when rewrite DistinctOn logical plan",
468                                }
469                            );
470                            LogicalPlanBuilder::from(inputs[0].clone())
471                                .distinct_on(
472                                    on_expr.clone(),
473                                    select_expr.clone(),
474                                    sort_expr.clone(),
475                                )?
476                                .build()
477                        }
478                        _ => plan.with_new_exprs(plan.expressions_consider_join(), inputs),
479                    }?;
480                    Ok(Some(plan))
481                } else {
482                    Ok(None)
483                }
484            }
485        }
486    }
487
488    /// Finds the time index column and default row-key grouping from the input schema.
489    ///
490    /// Returns `(time_index, [row_columns])` to the rewriter. If the user omits `BY`,
491    /// `[row_columns]` is used as the default time-series grouping.
492    ///
493    /// For derived inputs such as subqueries, joins, or set operations, the source table
494    /// qualifier may no longer resolve back to a table provider. In that case we can still
495    /// recover the time index from column metadata, but we cannot safely reconstruct the
496    /// original row-key columns, so omitted `BY` must be rejected by the caller.
497    async fn get_index_by(
498        &mut self,
499        schema: &Arc<DFSchema>,
500        need_default_by: bool,
501    ) -> Result<(Expr, Vec<Expr>)> {
502        #[allow(deprecated)]
503        let mut time_index_expr = Expr::Wildcard {
504            qualifier: None,
505            options: Box::new(WildcardOptions::default()),
506        };
507        let mut default_by = vec![];
508        let metadata_time_index_expr = (0..schema.fields().len()).find_map(|i| {
509            let (qualifier, field) = schema.qualified_field(i);
510            if field.metadata().contains_key(TIME_INDEX_KEY)
511                && matches!(field.data_type(), DataType::Timestamp(_, _))
512            {
513                Some(Expr::Column(Column::new(
514                    qualifier.cloned(),
515                    field.name().clone(),
516                )))
517            } else {
518                None
519            }
520        });
521        for i in 0..schema.fields().len() {
522            let (qualifier, _) = schema.qualified_field(i);
523            if let Some(table_ref) = qualifier {
524                let table_source = match self.table_provider.resolve_table(table_ref.clone()).await
525                {
526                    Ok(table_source) => table_source,
527                    Err(error) => {
528                        // `TableNotExist` here usually means the qualifier now refers to a derived
529                        // input instead of a base table. We can still salvage the time index from
530                        // field metadata, but only when such metadata is present.
531                        if matches!(&error, catalog::error::Error::TableNotExist { .. })
532                            && metadata_time_index_expr.is_some()
533                        {
534                            continue;
535                        }
536                        return Err(error).context(CatalogSnafu);
537                    }
538                };
539                let table = table_source
540                    .as_any()
541                    .downcast_ref::<DefaultTableSource>()
542                    .context(UnknownTableSnafu)?
543                    .table_provider
544                    .as_any()
545                    .downcast_ref::<DfTableProviderAdapter>()
546                    .context(UnknownTableSnafu)?
547                    .table();
548                let schema = table.schema();
549                let time_index_column =
550                    schema
551                        .timestamp_column()
552                        .with_context(|| TimeIndexNotFoundSnafu {
553                            table: table_ref.to_string(),
554                        })?;
555                // assert time_index's datatype is timestamp
556                if let ConcreteDataType::Timestamp(_) = time_index_column.data_type {
557                    default_by = table
558                        .table_info()
559                        .meta
560                        .row_key_column_names()
561                        .map(|key| Expr::Column(Column::new(Some(table_ref.clone()), key)))
562                        .collect();
563                    // If the user does not specify a primary key when creating a table,
564                    // then by default all data will be aggregated into one time series,
565                    // which is equivalent to using `by(1)` in SQL
566                    if default_by.is_empty() {
567                        default_by = vec![1.lit()];
568                    }
569                    time_index_expr = Expr::Column(Column::new(
570                        Some(table_ref.clone()),
571                        time_index_column.name.clone(),
572                    ));
573                }
574            }
575        }
576        #[allow(deprecated)]
577        if matches!(time_index_expr, Expr::Wildcard { .. })
578            && let Some(expr) = metadata_time_index_expr
579        {
580            common_telemetry::debug!(
581                "Range query falling back to time-index metadata for derived input schema: {}",
582                schema
583            );
584            ensure!(
585                !need_default_by,
586                RangeQuerySnafu {
587                    msg: "Cannot infer default BY columns from derived range query input"
588                }
589            );
590            time_index_expr = expr;
591        }
592        #[allow(deprecated)]
593        if matches!(time_index_expr, Expr::Wildcard { .. }) {
594            TimeIndexNotFoundSnafu {
595                table: schema.to_string(),
596            }
597            .fail()
598        } else {
599            Ok((time_index_expr, default_by))
600        }
601    }
602}
603
604fn have_range_in_exprs(exprs: &[Expr]) -> bool {
605    exprs.iter().any(|expr| {
606        let mut find_range = false;
607        let _ = expr.apply(|expr| {
608            Ok(match expr {
609                Expr::ScalarFunction(func) if func.name() == "range_fn" => {
610                    find_range = true;
611                    TreeNodeRecursion::Stop
612                }
613                _ => TreeNodeRecursion::Continue,
614            })
615        });
616        find_range
617    })
618}
619
620fn interval_only_in_expr(expr: &Expr) -> bool {
621    let mut all_interval = true;
622    let _ = expr.apply(|expr| {
623        // A cast expression for an interval.
624        if matches!(
625            expr,
626            Expr::Cast(Cast{
627                expr,
628                data_type: DataType::Interval(_)
629            }) if matches!(&**expr, Expr::Literal(ScalarValue::Utf8(_), _))
630        ) {
631            // Stop checking the sub `expr`,
632            // which is a `Utf8` type and has already been tested above.
633            return Ok(TreeNodeRecursion::Stop);
634        }
635
636        if !matches!(
637            expr,
638            Expr::Literal(ScalarValue::IntervalDayTime(_), _)
639                | Expr::Literal(ScalarValue::IntervalMonthDayNano(_), _)
640                | Expr::Literal(ScalarValue::IntervalYearMonth(_), _)
641                | Expr::BinaryExpr(_)
642                | Expr::Cast(Cast {
643                    data_type: DataType::Interval(_),
644                    ..
645                })
646        ) {
647            all_interval = false;
648            Ok(TreeNodeRecursion::Stop)
649        } else {
650            Ok(TreeNodeRecursion::Continue)
651        }
652    });
653
654    all_interval
655}
656
657#[cfg(test)]
658mod test {
659
660    use arrow::datatypes::IntervalUnit;
661    use catalog::RegisterTableRequest;
662    use catalog::memory::MemoryCatalogManager;
663    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
664    use common_time::IntervalYearMonth;
665    use datafusion_expr::{BinaryExpr, Literal, Operator};
666    use datatypes::prelude::ConcreteDataType;
667    use datatypes::schema::{ColumnSchema, Schema};
668    use session::context::QueryContext;
669    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
670    use table::table::TableRef;
671    use table::test_util::EmptyTable;
672
673    use super::*;
674    use crate::options::QueryOptions;
675    use crate::parser::QueryLanguageParser;
676    use crate::{QueryEngineFactory, QueryEngineRef};
677
678    async fn create_test_engine() -> QueryEngineRef {
679        create_test_engine_with_tables(&["test"], false).await
680    }
681
682    async fn create_union_test_engine() -> QueryEngineRef {
683        create_test_engine_with_tables(&["test_0", "test_1"], true).await
684    }
685
686    async fn create_test_engine_with_tables(
687        table_names: &[&str],
688        with_extra_timestamp: bool,
689    ) -> QueryEngineRef {
690        let catalog_list = MemoryCatalogManager::with_default_setup();
691        for (i, table_name) in table_names.iter().enumerate() {
692            let table = create_test_table(table_name, with_extra_timestamp);
693            assert!(
694                catalog_list
695                    .register_table_sync(RegisterTableRequest {
696                        catalog: DEFAULT_CATALOG_NAME.to_string(),
697                        schema: DEFAULT_SCHEMA_NAME.to_string(),
698                        table_name: (*table_name).to_string(),
699                        table_id: 1024 + i as u32,
700                        table,
701                    })
702                    .is_ok()
703            );
704        }
705        QueryEngineFactory::new(
706            catalog_list,
707            None,
708            None,
709            None,
710            None,
711            false,
712            QueryOptions::default(),
713        )
714        .query_engine()
715    }
716
717    fn create_test_table(table_name: &str, with_extra_timestamp: bool) -> TableRef {
718        let mut columns = vec![];
719        for i in 0..5 {
720            columns.push(ColumnSchema::new(
721                format!("tag_{i}"),
722                ConcreteDataType::string_datatype(),
723                false,
724            ));
725        }
726        columns.push(
727            ColumnSchema::new(
728                "timestamp".to_string(),
729                ConcreteDataType::timestamp_millisecond_datatype(),
730                false,
731            )
732            .with_time_index(true),
733        );
734        if with_extra_timestamp {
735            columns.push(ColumnSchema::new(
736                "timestamp_2".to_string(),
737                ConcreteDataType::timestamp_millisecond_datatype(),
738                true,
739            ));
740        }
741        for i in 0..5 {
742            columns.push(ColumnSchema::new(
743                format!("field_{i}"),
744                ConcreteDataType::float64_datatype(),
745                true,
746            ));
747        }
748        let schema = Arc::new(Schema::new(columns));
749        let table_meta = TableMetaBuilder::empty()
750            .schema(schema)
751            .primary_key_indices((0..5).collect())
752            .value_indices(if with_extra_timestamp {
753                (6..12).collect()
754            } else {
755                (6..11).collect()
756            })
757            .next_column_id(1024)
758            .build()
759            .unwrap();
760        let table_info = TableInfoBuilder::default()
761            .name(table_name)
762            .meta(table_meta)
763            .build()
764            .unwrap();
765        EmptyTable::from_table_info(&table_info)
766    }
767
768    async fn do_query(sql: &str) -> Result<LogicalPlan> {
769        let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
770        let engine = create_test_engine().await;
771        engine.planner().plan(&stmt, QueryContext::arc()).await
772    }
773
774    async fn do_union_query(sql: &str) -> Result<LogicalPlan> {
775        let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
776        let engine = create_union_test_engine().await;
777        engine.planner().plan(&stmt, QueryContext::arc()).await
778    }
779
780    async fn query_plan_compare(sql: &str, expected: String) {
781        let plan = do_query(sql).await.unwrap();
782        assert_eq!(plan.display_indent_schema().to_string(), expected);
783    }
784
785    #[tokio::test]
786    async fn range_no_project() {
787        let query = r#"SELECT timestamp, tag_0, tag_1, avg(field_0 + field_1) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
788        let expected = String::from(
789            "RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8, avg(test.field_0 + test.field_1) RANGE 5m:Float64;N]\
790            \n  TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
791        );
792        query_plan_compare(query, expected).await;
793    }
794
795    #[tokio::test]
796    async fn range_expr_calculation() {
797        let query = r#"SELECT (avg(field_0 + field_1)/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
798        let expected = String::from(
799            "Projection: avg(test.field_0 + test.field_1) RANGE 5m / Int64(4) [avg(test.field_0 + test.field_1) RANGE 5m / Int64(4):Float64;N]\
800            \n  RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
801            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
802        );
803        query_plan_compare(query, expected).await;
804    }
805
806    #[tokio::test]
807    async fn range_multi_args() {
808        let query =
809            r#"SELECT (covar(field_0 + field_1, field_1)/4) RANGE '5m' FROM test ALIGN '1h';"#;
810        let expected = String::from(
811            "Projection: covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4) [covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4):Float64;N]\
812            \n  RangeSelect: range_exprs=[covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1, test.tag_2, test.tag_3, test.tag_4], time_index=timestamp [covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8]\
813            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
814        );
815        query_plan_compare(query, expected).await;
816    }
817
818    #[tokio::test]
819    async fn range_calculation() {
820        let query = r#"SELECT ((avg(field_0)+sum(field_1))/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL;"#;
821        let expected = String::from(
822            "Projection: (avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL) / Int64(4) [avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL / Int64(4):Float64;N]\
823            \n  RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL NULL, sum(test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL NULL:Float64;N, sum(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
824            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
825        );
826        query_plan_compare(query, expected).await;
827    }
828
829    #[tokio::test]
830    async fn range_as_sub_query() {
831        let query = r#"SELECT foo + 1 from (SELECT ((avg(field_0)+sum(field_1))/4) RANGE '5m' as foo FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL) where foo > 1;"#;
832        let expected = String::from(
833            "Projection: foo + Int64(1) [foo + Int64(1):Float64;N]\
834            \n  Filter: foo > Int64(1) [foo:Float64;N]\
835            \n    Projection: (avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL) / Int64(4) AS foo [foo:Float64;N]\
836            \n      RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL NULL, sum(test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL NULL:Float64;N, sum(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
837            \n        TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
838        );
839        query_plan_compare(query, expected).await;
840    }
841
842    #[tokio::test]
843    async fn range_from_nest_query() {
844        let query = r#"SELECT ((avg(a)+sum(b))/4) RANGE '5m' FROM (SELECT field_0 as a, field_1 as b, tag_0 as c, tag_1 as d, timestamp from test where field_0 > 1.0) ALIGN '1h' by (c, d) FILL NULL;"#;
845        let expected = String::from(
846            "Projection: (avg(a) RANGE 5m FILL NULL + sum(b) RANGE 5m FILL NULL) / Int64(4) [avg(a) RANGE 5m FILL NULL + sum(b) RANGE 5m FILL NULL / Int64(4):Float64;N]\
847            \n  RangeSelect: range_exprs=[avg(a) RANGE 5m FILL NULL, sum(b) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[c, d], time_index=timestamp [avg(a) RANGE 5m FILL NULL:Float64;N, sum(b) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(ms), c:Utf8, d:Utf8]\
848            \n    Projection: test.field_0 AS a, test.field_1 AS b, test.tag_0 AS c, test.tag_1 AS d, test.timestamp [a:Float64;N, b:Float64;N, c:Utf8, d:Utf8, timestamp:Timestamp(ms)]\
849            \n      Filter: test.field_0 > Float64(1) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]\
850            \n        TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
851        );
852        query_plan_compare(query, expected).await;
853    }
854
855    #[tokio::test]
856    async fn range_from_union_query() {
857        let queries = [
858            r#"SELECT timestamp, tag_0, avg(field_0) RANGE '5m'
859            FROM (
860                SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_0
861                UNION ALL
862                SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_1
863            )
864            WHERE timestamp >= '1970-01-01 00:00:00'
865            ALIGN '1h' by (tag_0)"#,
866            r#"SELECT tmp.timestamp, tmp.tag_0, avg(tmp.field_0) RANGE '5m'
867            FROM (
868                SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_0
869                UNION ALL
870                SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_1
871            ) AS tmp
872            WHERE tmp.timestamp >= '1970-01-01 00:00:00'
873            ALIGN '1h' by (tmp.tag_0)"#,
874        ];
875
876        for query in queries {
877            let plan = do_union_query(query)
878                .await
879                .unwrap()
880                .display_indent_schema()
881                .to_string();
882
883            assert!(plan.contains("RangeSelect"));
884            assert!(plan.contains("Union"));
885            assert!(plan.contains("time_index=timestamp"));
886        }
887    }
888
889    #[tokio::test]
890    async fn range_from_derived_query_without_by_err() {
891        let queries = [
892            r#"SELECT timestamp, tag_0, avg(field_0) RANGE '5m'
893            FROM (
894                SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_0
895                UNION ALL
896                SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_1
897            )
898            WHERE timestamp >= '1970-01-01 00:00:00'
899            ALIGN '1h'"#,
900            r#"SELECT tmp.timestamp, tmp.tag_0, avg(tmp.field_0) RANGE '5m'
901            FROM (
902                SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_0
903                UNION ALL
904                SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_1
905            ) AS tmp
906            WHERE tmp.timestamp >= '1970-01-01 00:00:00'
907            ALIGN '1h'"#,
908        ];
909
910        for query in queries {
911            assert_eq!(
912                do_union_query(query).await.unwrap_err().to_string(),
913                "Range Query: Cannot infer default BY columns from derived range query input"
914            );
915        }
916    }
917
918    #[tokio::test]
919    async fn range_in_expr() {
920        let query = r#"SELECT sin(avg(field_0 + field_1) RANGE '5m' + 1) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
921        let expected = String::from(
922            "Projection: sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1)) [sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1)):Float64;N]\
923            \n  RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
924            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
925        );
926        query_plan_compare(query, expected).await;
927    }
928
929    #[tokio::test]
930    async fn duplicate_range_expr() {
931        let query = r#"SELECT avg(field_0) RANGE '5m' FILL 6.0 + avg(field_0) RANGE '5m' FILL 6.0 FROM test ALIGN '1h' by (tag_0,tag_1);"#;
932        let expected = String::from(
933            "Projection: avg(test.field_0) RANGE 5m FILL 6 + avg(test.field_0) RANGE 5m FILL 6 [avg(test.field_0) RANGE 5m FILL 6 + avg(test.field_0) RANGE 5m FILL 6:Float64]\
934            \n  RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL 6], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL 6:Float64, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
935            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
936        );
937        query_plan_compare(query, expected).await;
938    }
939
940    #[tokio::test]
941    async fn deep_nest_range_expr() {
942        let query = r#"SELECT round(sin(avg(field_0 + field_1) RANGE '5m' + 1)) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
943        let expected = String::from(
944            "Projection: round(sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1))) [round(sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1))):Float64;N]\
945            \n  RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
946            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
947        );
948        query_plan_compare(query, expected).await;
949    }
950
951    #[tokio::test]
952    async fn complex_range_expr() {
953        let query = r#"SELECT gcd(CAST(max(field_0 + 1) Range '5m' FILL NULL AS Int64), CAST(tag_0 AS Int64)) + round(max(field_2+1) Range '6m' FILL NULL + 1) + max(field_2+3) Range '10m' FILL NULL * CAST(tag_1 AS Float64) + 1 FROM test ALIGN '1h' by (tag_0, tag_1);"#;
954        let expected = String::from(
955            "Projection: gcd(arrow_cast(max(test.field_0 + Int64(1)) RANGE 5m FILL NULL, Utf8(\"Int64\")), arrow_cast(test.tag_0, Utf8(\"Int64\"))) + round(max(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + max(test.field_2 + Int64(3)) RANGE 10m FILL NULL * arrow_cast(test.tag_1, Utf8(\"Float64\")) + Int64(1) [gcd(arrow_cast(max(test.field_0 + Int64(1)) RANGE 5m FILL NULL,Utf8(\"Int64\")),arrow_cast(test.tag_0,Utf8(\"Int64\"))) + round(max(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + max(test.field_2 + Int64(3)) RANGE 10m FILL NULL * arrow_cast(test.tag_1,Utf8(\"Float64\")) + Int64(1):Float64;N]\
956            \n  RangeSelect: range_exprs=[max(test.field_0 + Int64(1)) RANGE 5m FILL NULL, max(test.field_2 + Int64(1)) RANGE 6m FILL NULL, max(test.field_2 + Int64(3)) RANGE 10m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [max(test.field_0 + Int64(1)) RANGE 5m FILL NULL:Float64;N, max(test.field_2 + Int64(1)) RANGE 6m FILL NULL:Float64;N, max(test.field_2 + Int64(3)) RANGE 10m FILL NULL:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
957            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
958        );
959        query_plan_compare(query, expected).await;
960    }
961
962    #[tokio::test]
963    async fn range_linear_on_integer() {
964        let query = r#"SELECT min(CAST(field_0 AS Int64) + CAST(field_1 AS Int64)) RANGE '5m' FILL LINEAR FROM test ALIGN '1h' by (tag_0,tag_1);"#;
965        let expected = String::from(
966            "RangeSelect: range_exprs=[min(arrow_cast(test.field_0,Utf8(\"Int64\")) + arrow_cast(test.field_1,Utf8(\"Int64\"))) RANGE 5m FILL LINEAR], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [min(arrow_cast(test.field_0,Utf8(\"Int64\")) + arrow_cast(test.field_1,Utf8(\"Int64\"))) RANGE 5m FILL LINEAR:Float64;N]\
967            \n  TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
968        );
969        query_plan_compare(query, expected).await;
970    }
971
972    #[tokio::test]
973    async fn range_nest_range_err() {
974        let query = r#"SELECT sum(avg(field_0 + field_1) RANGE '5m' + 1) RANGE '5m' + 1 FROM test ALIGN '1h' by (tag_0,tag_1);"#;
975        assert_eq!(
976            do_query(query).await.unwrap_err().to_string(),
977            "Range Query: Nest Range Query is not allowed"
978        )
979    }
980
981    #[tokio::test]
982    /// Start directly from the rewritten SQL and check whether the error reported by the range expression rewriting is as expected.
983    /// the right argument is `range_fn(avg(field_0), '5m', 'NULL', '0', '1h')`
984    async fn range_argument_err_1() {
985        let query = r#"SELECT range_fn('5m', avg(field_0), 'NULL', '1', tag_0, '1h') FROM test group by tag_0;"#;
986        let error = do_query(query).await.unwrap_err().to_string();
987        assert_eq!(
988            error,
989            "Error during planning: Illegal argument `Utf8(\"5m\")` in range select query"
990        )
991    }
992
993    #[tokio::test]
994    async fn range_argument_err_2() {
995        let query = r#"SELECT range_fn(avg(field_0), 5, 'NULL', '1', tag_0, '1h') FROM test group by tag_0;"#;
996        let error = do_query(query).await.unwrap_err().to_string();
997        assert_eq!(
998            error,
999            "Error during planning: Illegal argument `Int64(5)` in range select query"
1000        )
1001    }
1002
1003    #[test]
1004    fn test_parse_duration_expr() {
1005        // test IntervalYearMonth
1006        let interval = IntervalYearMonth::new(10);
1007        let args = vec![ScalarValue::IntervalYearMonth(Some(interval.to_i32())).lit()];
1008        assert!(parse_duration_expr(&args, 0).is_err(),);
1009        // test IntervalDayTime
1010        let interval = IntervalDayTime::new(10, 10);
1011        let args = vec![ScalarValue::IntervalDayTime(Some(interval.into())).lit()];
1012        assert_eq!(
1013            parse_duration_expr(&args, 0).unwrap().as_millis() as i64,
1014            interval.as_millis()
1015        );
1016        // test IntervalMonthDayNano
1017        let interval = IntervalMonthDayNano::new(0, 10, 10);
1018        let args = vec![ScalarValue::IntervalMonthDayNano(Some(interval.into())).lit()];
1019        assert_eq!(
1020            parse_duration_expr(&args, 0).unwrap().as_millis() as i64,
1021            interval.days as i64 * MS_PER_DAY + interval.nanoseconds / NANOS_PER_MILLI,
1022        );
1023        // test Duration
1024        let args = vec!["1y4w".lit()];
1025        assert_eq!(
1026            parse_duration_expr(&args, 0).unwrap(),
1027            parse_duration("1y4w").unwrap()
1028        );
1029        // test cast expression
1030        let args = vec![Expr::Cast(Cast {
1031            expr: Box::new("15 minutes".lit()),
1032            data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1033        })];
1034        assert_eq!(
1035            parse_duration_expr(&args, 0).unwrap(),
1036            parse_duration("15m").unwrap()
1037        );
1038        // test index err
1039        assert!(parse_duration_expr(&args, 10).is_err());
1040        // test evaluate expr
1041        let args = vec![Expr::BinaryExpr(BinaryExpr {
1042            left: Box::new(
1043                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
1044            ),
1045            op: Operator::Plus,
1046            right: Box::new(
1047                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
1048            ),
1049        })];
1050        assert_eq!(
1051            parse_duration_expr(&args, 0).unwrap(),
1052            Duration::from_millis(20)
1053        );
1054        let args = vec![Expr::BinaryExpr(BinaryExpr {
1055            left: Box::new(
1056                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
1057            ),
1058            op: Operator::Minus,
1059            right: Box::new(
1060                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
1061            ),
1062        })];
1063        // test zero interval error
1064        assert!(parse_duration_expr(&args, 0).is_err());
1065        // test must all be interval
1066        let args = vec![Expr::BinaryExpr(BinaryExpr {
1067            left: Box::new(
1068                ScalarValue::IntervalYearMonth(Some(IntervalYearMonth::new(10).to_i32())).lit(),
1069            ),
1070            op: Operator::Minus,
1071            right: Box::new(ScalarValue::Time64Microsecond(Some(0)).lit()),
1072        })];
1073        assert!(parse_duration_expr(&args, 0).is_err());
1074    }
1075
1076    #[test]
1077    fn test_parse_align_to() {
1078        // test NOW
1079        let args = vec!["NOW".lit()];
1080        let epsinon = parse_align_to(&args, 0, None).unwrap() - Timestamp::current_millis().value();
1081        assert!(epsinon.abs() < 100);
1082        // test default
1083        let args = vec!["".lit()];
1084        assert_eq!(0, parse_align_to(&args, 0, None).unwrap());
1085        // test default with timezone
1086        let args = vec!["".lit()];
1087        assert_eq!(
1088            -36000 * 1000,
1089            parse_align_to(&args, 0, Some(&Timezone::from_tz_string("HST").unwrap())).unwrap()
1090        );
1091        assert_eq!(
1092            28800 * 1000,
1093            parse_align_to(
1094                &args,
1095                0,
1096                Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
1097            )
1098            .unwrap()
1099        );
1100
1101        // test Timestamp
1102        let args = vec!["1970-01-01T00:00:00+08:00".lit()];
1103        assert_eq!(parse_align_to(&args, 0, None).unwrap(), -8 * 60 * 60 * 1000);
1104        // timezone
1105        let args = vec!["1970-01-01T00:00:00".lit()];
1106        assert_eq!(
1107            parse_align_to(
1108                &args,
1109                0,
1110                Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
1111            )
1112            .unwrap(),
1113            -8 * 60 * 60 * 1000
1114        );
1115        // test evaluate expr
1116        let args = vec![Expr::BinaryExpr(BinaryExpr {
1117            left: Box::new(
1118                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
1119            ),
1120            op: Operator::Plus,
1121            right: Box::new(
1122                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
1123            ),
1124        })];
1125        assert_eq!(parse_align_to(&args, 0, None).unwrap(), 20);
1126    }
1127
1128    #[test]
1129    fn test_interval_only() {
1130        let expr = Expr::BinaryExpr(BinaryExpr {
1131            left: Box::new(ScalarValue::DurationMillisecond(Some(20)).lit()),
1132            op: Operator::Minus,
1133            right: Box::new(
1134                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
1135            ),
1136        });
1137        assert!(!interval_only_in_expr(&expr));
1138        let expr = Expr::BinaryExpr(BinaryExpr {
1139            left: Box::new(
1140                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
1141            ),
1142            op: Operator::Minus,
1143            right: Box::new(
1144                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
1145            ),
1146        });
1147        assert!(interval_only_in_expr(&expr));
1148
1149        let expr = Expr::BinaryExpr(BinaryExpr {
1150            left: Box::new(Expr::Cast(Cast {
1151                expr: Box::new("15 minute".lit()),
1152                data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1153            })),
1154            op: Operator::Minus,
1155            right: Box::new(
1156                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
1157            ),
1158        });
1159        assert!(interval_only_in_expr(&expr));
1160
1161        let expr = Expr::Cast(Cast {
1162            expr: Box::new(Expr::BinaryExpr(BinaryExpr {
1163                left: Box::new(Expr::Cast(Cast {
1164                    expr: Box::new("15 minute".lit()),
1165                    data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1166                })),
1167                op: Operator::Minus,
1168                right: Box::new(
1169                    ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
1170                ),
1171            })),
1172            data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1173        });
1174
1175        assert!(interval_only_in_expr(&expr));
1176    }
1177}