query/range_select/
plan_rewrite.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeSet;
16use std::sync::Arc;
17use std::time::Duration;
18
19use arrow_schema::DataType;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use chrono::Utc;
23use common_time::interval::{MS_PER_DAY, NANOS_PER_MILLI};
24use common_time::timestamp::TimeUnit;
25use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp, Timezone};
26use datafusion::datasource::DefaultTableSource;
27use datafusion::prelude::Column;
28use datafusion::scalar::ScalarValue;
29use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
30use datafusion_common::{DFSchema, DataFusionError, Result as DFResult};
31use datafusion_expr::execution_props::ExecutionProps;
32use datafusion_expr::expr::WildcardOptions;
33use datafusion_expr::simplify::SimplifyContext;
34use datafusion_expr::{
35    Aggregate, Analyze, Cast, Distinct, DistinctOn, Explain, Expr, ExprSchemable, Extension,
36    LogicalPlan, LogicalPlanBuilder, Projection,
37};
38use datafusion_optimizer::simplify_expressions::ExprSimplifier;
39use datatypes::prelude::ConcreteDataType;
40use promql_parser::util::parse_duration;
41use session::context::QueryContextRef;
42use snafu::{ensure, OptionExt, ResultExt};
43use table::table::adapter::DfTableProviderAdapter;
44
45use crate::error::{
46    CatalogSnafu, RangeQuerySnafu, Result, TimeIndexNotFoundSnafu, UnknownTableSnafu,
47};
48use crate::plan::ExtractExpr;
49use crate::range_select::plan::{Fill, RangeFn, RangeSelect};
50
51/// `RangeExprRewriter` will recursively search certain `Expr`, find all `range_fn` scalar udf contained in `Expr`,
52/// and collect the information required by the RangeSelect query,
53/// and finally modify the `range_fn` scalar udf to an ordinary column field.
54pub struct RangeExprRewriter<'a> {
55    input_plan: &'a Arc<LogicalPlan>,
56    align: Duration,
57    align_to: i64,
58    by: Vec<Expr>,
59    /// Use `BTreeSet` to avoid in case like `avg(a) RANGE '5m' + avg(a) RANGE '5m'`, duplicate range expr `avg(a) RANGE '5m'` be calculate twice
60    range_fn: BTreeSet<RangeFn>,
61    sub_aggr: &'a Aggregate,
62    query_ctx: &'a QueryContextRef,
63}
64
65impl RangeExprRewriter<'_> {
66    pub fn get_range_expr(&self, args: &[Expr], i: usize) -> DFResult<Expr> {
67        match args.get(i) {
68            Some(Expr::Column(column)) => {
69                let index = self.sub_aggr.schema.index_of_column(column)?;
70                let len = self.sub_aggr.group_expr.len();
71                self.sub_aggr
72                    .aggr_expr
73                    .get(index - len)
74                    .cloned()
75                    .ok_or(DataFusionError::Plan(
76                        "Range expr not found in underlying Aggregate Plan".into(),
77                    ))
78            }
79            other => Err(dispose_parse_error(other)),
80        }
81    }
82}
83
84#[inline]
85fn dispose_parse_error(expr: Option<&Expr>) -> DataFusionError {
86    DataFusionError::Plan(
87        expr.map(|x| {
88            format!(
89                "Illegal argument `{}` in range select query",
90                x.schema_name()
91            )
92        })
93        .unwrap_or("Missing argument in range select query".into()),
94    )
95}
96
97fn parse_str_expr(args: &[Expr], i: usize) -> DFResult<&str> {
98    match args.get(i) {
99        Some(Expr::Literal(ScalarValue::Utf8(Some(str)))) => Ok(str.as_str()),
100        other => Err(dispose_parse_error(other)),
101    }
102}
103
104fn parse_expr_to_string(args: &[Expr], i: usize) -> DFResult<String> {
105    match args.get(i) {
106        Some(Expr::Literal(ScalarValue::Utf8(Some(str)))) => Ok(str.to_string()),
107        Some(expr) => Ok(expr.schema_name().to_string()),
108        None => Err(dispose_parse_error(None)),
109    }
110}
111
112/// Parse a duraion expr:
113/// 1. duration string (e.g. `'1h'`)
114/// 2. Interval expr (e.g. `INTERVAL '1 year 3 hours 20 minutes'`)
115/// 3. An interval expr can be evaluated at the logical plan stage (e.g. `INTERVAL '2' day - INTERVAL '1' day`)
116fn parse_duration_expr(args: &[Expr], i: usize) -> DFResult<Duration> {
117    match args.get(i) {
118        Some(Expr::Literal(ScalarValue::Utf8(Some(str)))) => {
119            parse_duration(str).map_err(DataFusionError::Plan)
120        }
121        Some(expr) => {
122            let ms = evaluate_expr_to_millisecond(args, i, true)?;
123            if ms <= 0 {
124                return Err(dispose_parse_error(Some(expr)));
125            }
126            Ok(Duration::from_millis(ms as u64))
127        }
128        None => Err(dispose_parse_error(None)),
129    }
130}
131
132/// Evaluate a time calculation expr, case like:
133/// 1. `INTERVAL '1' day + INTERVAL '1 year 2 hours 3 minutes'`
134/// 2. `now() - INTERVAL '1' day` (when `interval_only==false`)
135///
136/// Output a millisecond timestamp
137///
138/// if `interval_only==true`, only accept expr with all interval type (case 2 will return a error)
139fn evaluate_expr_to_millisecond(args: &[Expr], i: usize, interval_only: bool) -> DFResult<i64> {
140    let Some(expr) = args.get(i) else {
141        return Err(dispose_parse_error(None));
142    };
143    if interval_only && !interval_only_in_expr(expr) {
144        return Err(dispose_parse_error(Some(expr)));
145    }
146    let execution_props = ExecutionProps::new().with_query_execution_start_time(Utc::now());
147    let info = SimplifyContext::new(&execution_props).with_schema(Arc::new(DFSchema::empty()));
148    let simplify_expr = ExprSimplifier::new(info).simplify(expr.clone())?;
149    match simplify_expr {
150        Expr::Literal(ScalarValue::TimestampNanosecond(ts_nanos, _))
151        | Expr::Literal(ScalarValue::DurationNanosecond(ts_nanos)) => {
152            ts_nanos.map(|v| v / 1_000_000)
153        }
154        Expr::Literal(ScalarValue::TimestampMicrosecond(ts_micros, _))
155        | Expr::Literal(ScalarValue::DurationMicrosecond(ts_micros)) => {
156            ts_micros.map(|v| v / 1_000)
157        }
158        Expr::Literal(ScalarValue::TimestampMillisecond(ts_millis, _))
159        | Expr::Literal(ScalarValue::DurationMillisecond(ts_millis)) => ts_millis,
160        Expr::Literal(ScalarValue::TimestampSecond(ts_secs, _))
161        | Expr::Literal(ScalarValue::DurationSecond(ts_secs)) => ts_secs.map(|v| v * 1_000),
162        // We don't support interval with months as days in a month is unclear.
163        Expr::Literal(ScalarValue::IntervalYearMonth(interval)) => interval
164            .map(|v| {
165                let interval = IntervalYearMonth::from_i32(v);
166                if interval.months != 0 {
167                    return Err(DataFusionError::Plan(format!(
168                        "Year or month interval is not allowed in range query: {}",
169                        expr.schema_name()
170                    )));
171                }
172
173                Ok(0)
174            })
175            .transpose()?,
176        Expr::Literal(ScalarValue::IntervalDayTime(interval)) => interval.map(|v| {
177            let interval = IntervalDayTime::from(v);
178            interval.as_millis()
179        }),
180        Expr::Literal(ScalarValue::IntervalMonthDayNano(interval)) => interval
181            .map(|v| {
182                let interval = IntervalMonthDayNano::from(v);
183                if interval.months != 0 {
184                    return Err(DataFusionError::Plan(format!(
185                        "Year or month interval is not allowed in range query: {}",
186                        expr.schema_name()
187                    )));
188                }
189
190                Ok(interval.days as i64 * MS_PER_DAY + interval.nanoseconds / NANOS_PER_MILLI)
191            })
192            .transpose()?,
193        _ => None,
194    }
195    .ok_or_else(|| {
196        DataFusionError::Plan(format!(
197            "{} is not a expr can be evaluate and use in range query",
198            expr.schema_name()
199        ))
200    })
201}
202
203/// Parse the `align to` clause and return a UTC timestamp with unit of millisecond,
204/// which is used as the basis for dividing time slot during the align operation.
205/// 1. NOW: align to current execute time
206/// 2. Timestamp string: align to specific timestamp
207/// 3. An expr can be evaluated at the logical plan stage (e.g. `now() - INTERVAL '1' day`)
208/// 4. leave empty (as Default Option): align to unix epoch 0 (timezone aware)
209fn parse_align_to(args: &[Expr], i: usize, timezone: Option<&Timezone>) -> DFResult<i64> {
210    let Ok(s) = parse_str_expr(args, i) else {
211        return evaluate_expr_to_millisecond(args, i, false);
212    };
213    let upper = s.to_uppercase();
214    match upper.as_str() {
215        "NOW" => return Ok(Timestamp::current_millis().value()),
216        // default align to unix epoch 0 (timezone aware)
217        "" => return Ok(timezone.map(|tz| tz.local_minus_utc() * 1000).unwrap_or(0)),
218        _ => (),
219    }
220
221    Timestamp::from_str(s, timezone)
222        .map_err(|e| {
223            DataFusionError::Plan(format!(
224                "Illegal `align to` argument `{}` in range select query, can't be parse as NOW/CALENDAR/Timestamp, error: {}",
225                s, e
226            ))
227        })?.convert_to(TimeUnit::Millisecond).map(|x|x.value()).ok_or(DataFusionError::Plan(format!(
228            "Illegal `align to` argument `{}` in range select query, can't be convert to a valid Timestamp",
229            s
230        ))
231        )
232}
233
234fn parse_expr_list(args: &[Expr], start: usize, len: usize) -> DFResult<Vec<Expr>> {
235    let mut outs = Vec::with_capacity(len);
236    for i in start..start + len {
237        outs.push(match &args.get(i) {
238            Some(
239                Expr::Column(_) | Expr::Literal(_) | Expr::BinaryExpr(_) | Expr::ScalarFunction(_),
240            ) => args[i].clone(),
241            other => {
242                return Err(dispose_parse_error(*other));
243            }
244        });
245    }
246    Ok(outs)
247}
248
249macro_rules! inconsistent_check {
250    ($self: ident.$name: ident, $cond: expr) => {
251        if $cond && $self.$name != $name {
252            return Err(DataFusionError::Plan(
253                concat!(
254                    "Inconsistent ",
255                    stringify!($name),
256                    " given in Range Function Rewrite"
257                )
258                .into(),
259            ));
260        } else {
261            $self.$name = $name;
262        }
263    };
264}
265
266impl TreeNodeRewriter for RangeExprRewriter<'_> {
267    type Node = Expr;
268
269    fn f_down(&mut self, node: Expr) -> DFResult<Transformed<Expr>> {
270        if let Expr::ScalarFunction(func) = &node {
271            if func.name() == "range_fn" {
272                // `range_fn(func, range, fill, byc, [byv], align, to)`
273                // `[byv]` are variadic arguments, byc indicate the length of arguments
274                let range_expr = self.get_range_expr(&func.args, 0)?;
275                let range = parse_duration_expr(&func.args, 1)?;
276                let byc = str::parse::<usize>(parse_str_expr(&func.args, 3)?)
277                    .map_err(|e| DataFusionError::Plan(e.to_string()))?;
278                let by = parse_expr_list(&func.args, 4, byc)?;
279                let align = parse_duration_expr(&func.args, byc + 4)?;
280                let align_to =
281                    parse_align_to(&func.args, byc + 5, Some(&self.query_ctx.timezone()))?;
282                let mut data_type = range_expr.get_type(self.input_plan.schema())?;
283                let mut need_cast = false;
284                let fill = Fill::try_from_str(parse_str_expr(&func.args, 2)?, &data_type)?;
285                if matches!(fill, Some(Fill::Linear)) && data_type.is_integer() {
286                    data_type = DataType::Float64;
287                    need_cast = true;
288                }
289                inconsistent_check!(self.by, !self.by.is_empty());
290                inconsistent_check!(self.align, self.align != Duration::default());
291                inconsistent_check!(self.align_to, self.align_to != 0);
292                let range_fn = RangeFn {
293                    name: if let Some(fill) = &fill {
294                        format!(
295                            "{} RANGE {} FILL {}",
296                            range_expr.schema_name(),
297                            parse_expr_to_string(&func.args, 1)?,
298                            fill
299                        )
300                    } else {
301                        format!(
302                            "{} RANGE {}",
303                            range_expr.schema_name(),
304                            parse_expr_to_string(&func.args, 1)?,
305                        )
306                    },
307                    data_type,
308                    expr: range_expr,
309                    range,
310                    fill,
311                    need_cast,
312                };
313                let alias = Expr::Column(Column::from_name(range_fn.name.clone()));
314                self.range_fn.insert(range_fn);
315                return Ok(Transformed::yes(alias));
316            }
317        }
318        Ok(Transformed::no(node))
319    }
320}
321
322/// In order to implement RangeSelect query like `avg(field_0) RANGE '5m' FILL NULL`,
323/// All RangeSelect query items are converted into udf scalar function in sql parse stage, with format like `range_fn(avg(field_0), .....)`.
324/// `range_fn` contains all the parameters we need to execute RangeSelect.
325/// In order to correctly execute the query process of range select, we need to modify the query plan generated by datafusion.
326/// We need to recursively find the entire LogicalPlan, and find all `range_fn` scalar udf contained in the project plan,
327/// collecting info we need to generate RangeSelect Query LogicalPlan and rewrite th original LogicalPlan.
328pub struct RangePlanRewriter {
329    table_provider: DfTableSourceProvider,
330    query_ctx: QueryContextRef,
331}
332
333impl RangePlanRewriter {
334    pub fn new(table_provider: DfTableSourceProvider, query_ctx: QueryContextRef) -> Self {
335        Self {
336            table_provider,
337            query_ctx,
338        }
339    }
340
341    pub async fn rewrite(&mut self, plan: LogicalPlan) -> Result<LogicalPlan> {
342        match self.rewrite_logical_plan(&plan).await? {
343            Some(new_plan) => Ok(new_plan),
344            None => Ok(plan),
345        }
346    }
347
348    #[async_recursion]
349    async fn rewrite_logical_plan(&mut self, plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
350        let inputs = plan.inputs();
351        let mut new_inputs = Vec::with_capacity(inputs.len());
352        for input in &inputs {
353            new_inputs.push(self.rewrite_logical_plan(input).await?)
354        }
355        match plan {
356            LogicalPlan::Projection(Projection { expr, input, .. })
357                if have_range_in_exprs(expr) =>
358            {
359                let (aggr_plan, input) = if let LogicalPlan::Aggregate(aggr) = input.as_ref() {
360                    // Expr like `rate(max(a) RANGE '6m') RANGE '6m'` have legal syntax but illegal semantic.
361                    if have_range_in_exprs(&aggr.aggr_expr) {
362                        return RangeQuerySnafu {
363                            msg: "Nest Range Query is not allowed",
364                        }
365                        .fail();
366                    }
367                    (aggr, aggr.input.clone())
368                } else {
369                    return RangeQuerySnafu {
370                        msg: "Window functions is not allowed in Range Query",
371                    }
372                    .fail();
373                };
374                let (time_index, default_by) = self.get_index_by(input.schema()).await?;
375                let mut range_rewriter = RangeExprRewriter {
376                    input_plan: &input,
377                    align: Duration::default(),
378                    align_to: 0,
379                    by: vec![],
380                    range_fn: BTreeSet::new(),
381                    sub_aggr: aggr_plan,
382                    query_ctx: &self.query_ctx,
383                };
384                let new_expr = expr
385                    .iter()
386                    .map(|expr| expr.clone().rewrite(&mut range_rewriter).map(|x| x.data))
387                    .collect::<DFResult<Vec<_>>>()?;
388                if range_rewriter.by.is_empty() {
389                    range_rewriter.by = default_by;
390                }
391                let range_select = RangeSelect::try_new(
392                    input.clone(),
393                    range_rewriter.range_fn.into_iter().collect(),
394                    range_rewriter.align,
395                    range_rewriter.align_to,
396                    time_index,
397                    range_rewriter.by,
398                    &new_expr,
399                )?;
400                let no_additional_project = range_select.schema_project.is_some();
401                let range_plan = LogicalPlan::Extension(Extension {
402                    node: Arc::new(range_select),
403                });
404                if no_additional_project {
405                    Ok(Some(range_plan))
406                } else {
407                    let project_plan = LogicalPlanBuilder::from(range_plan)
408                        .project(new_expr)
409                        .and_then(|x| x.build())?;
410                    Ok(Some(project_plan))
411                }
412            }
413            _ => {
414                if new_inputs.iter().any(|x| x.is_some()) {
415                    let inputs: Vec<LogicalPlan> = new_inputs
416                        .into_iter()
417                        .zip(inputs)
418                        .map(|(x, y)| match x {
419                            Some(plan) => plan,
420                            None => y.clone(),
421                        })
422                        .collect();
423                    // Due to the limitations of Datafusion, for `LogicalPlan::Analyze` and `LogicalPlan::Explain`,
424                    // directly using the method `with_new_inputs` to rebuild a new `LogicalPlan` will cause an error,
425                    // so here we directly use the `LogicalPlanBuilder` to build a new plan.
426                    let plan = match plan {
427                        LogicalPlan::Analyze(Analyze { verbose, .. }) => {
428                            ensure!(
429                                inputs.len() == 1,
430                                RangeQuerySnafu {
431                                    msg: "Illegal subplan nums when rewrite Analyze logical plan",
432                                }
433                            );
434                            LogicalPlanBuilder::from(inputs[0].clone())
435                                .explain(*verbose, true)?
436                                .build()
437                        }
438                        LogicalPlan::Explain(Explain { verbose, .. }) => {
439                            ensure!(
440                                inputs.len() == 1,
441                                RangeQuerySnafu {
442                                    msg: "Illegal subplan nums when rewrite Explain logical plan",
443                                }
444                            );
445                            LogicalPlanBuilder::from(inputs[0].clone())
446                                .explain(*verbose, false)?
447                                .build()
448                        }
449                        LogicalPlan::Distinct(Distinct::On(DistinctOn {
450                            on_expr,
451                            select_expr,
452                            sort_expr,
453                            ..
454                        })) => {
455                            ensure!(
456                                inputs.len() == 1,
457                                RangeQuerySnafu {
458                                    msg:
459                                        "Illegal subplan nums when rewrite DistinctOn logical plan",
460                                }
461                            );
462                            LogicalPlanBuilder::from(inputs[0].clone())
463                                .distinct_on(
464                                    on_expr.clone(),
465                                    select_expr.clone(),
466                                    sort_expr.clone(),
467                                )?
468                                .build()
469                        }
470                        _ => plan.with_new_exprs(plan.expressions_consider_join(), inputs),
471                    }?;
472                    Ok(Some(plan))
473                } else {
474                    Ok(None)
475                }
476            }
477        }
478    }
479
480    /// this function use to find the time_index column and row columns from input schema,
481    /// return `(time_index, [row_columns])` to the rewriter.
482    /// If the user does not explicitly use the `by` keyword to indicate time series,
483    /// `[row_columns]` will be use as default time series
484    async fn get_index_by(&mut self, schema: &Arc<DFSchema>) -> Result<(Expr, Vec<Expr>)> {
485        let mut time_index_expr = Expr::Wildcard {
486            qualifier: None,
487            options: Box::new(WildcardOptions::default()),
488        };
489        let mut default_by = vec![];
490        for i in 0..schema.fields().len() {
491            let (qualifier, _) = schema.qualified_field(i);
492            if let Some(table_ref) = qualifier {
493                let table = self
494                    .table_provider
495                    .resolve_table(table_ref.clone())
496                    .await
497                    .context(CatalogSnafu)?
498                    .as_any()
499                    .downcast_ref::<DefaultTableSource>()
500                    .context(UnknownTableSnafu)?
501                    .table_provider
502                    .as_any()
503                    .downcast_ref::<DfTableProviderAdapter>()
504                    .context(UnknownTableSnafu)?
505                    .table();
506                let schema = table.schema();
507                let time_index_column =
508                    schema
509                        .timestamp_column()
510                        .with_context(|| TimeIndexNotFoundSnafu {
511                            table: table_ref.to_string(),
512                        })?;
513                // assert time_index's datatype is timestamp
514                if let ConcreteDataType::Timestamp(_) = time_index_column.data_type {
515                    default_by = table
516                        .table_info()
517                        .meta
518                        .row_key_column_names()
519                        .map(|key| Expr::Column(Column::new(Some(table_ref.clone()), key)))
520                        .collect();
521                    // If the user does not specify a primary key when creating a table,
522                    // then by default all data will be aggregated into one time series,
523                    // which is equivalent to using `by(1)` in SQL
524                    if default_by.is_empty() {
525                        default_by = vec![Expr::Literal(ScalarValue::Int64(Some(1)))];
526                    }
527                    time_index_expr = Expr::Column(Column::new(
528                        Some(table_ref.clone()),
529                        time_index_column.name.clone(),
530                    ));
531                }
532            }
533        }
534        if matches!(time_index_expr, Expr::Wildcard { .. }) {
535            TimeIndexNotFoundSnafu {
536                table: schema.to_string(),
537            }
538            .fail()
539        } else {
540            Ok((time_index_expr, default_by))
541        }
542    }
543}
544
545fn have_range_in_exprs(exprs: &[Expr]) -> bool {
546    exprs.iter().any(|expr| {
547        let mut find_range = false;
548        let _ = expr.apply(|expr| {
549            Ok(match expr {
550                Expr::ScalarFunction(func) if func.name() == "range_fn" => {
551                    find_range = true;
552                    TreeNodeRecursion::Stop
553                }
554                _ => TreeNodeRecursion::Continue,
555            })
556        });
557        find_range
558    })
559}
560
561fn interval_only_in_expr(expr: &Expr) -> bool {
562    let mut all_interval = true;
563    let _ = expr.apply(|expr| {
564        // A cast expression for an interval.
565        if matches!(
566            expr,
567            Expr::Cast(Cast{
568                expr,
569                data_type: DataType::Interval(_)
570            }) if matches!(&**expr, Expr::Literal(ScalarValue::Utf8(_)))
571        ) {
572            // Stop checking the sub `expr`,
573            // which is a `Utf8` type and has already been tested above.
574            return Ok(TreeNodeRecursion::Stop);
575        }
576
577        if !matches!(
578            expr,
579            Expr::Literal(ScalarValue::IntervalDayTime(_))
580                | Expr::Literal(ScalarValue::IntervalMonthDayNano(_))
581                | Expr::Literal(ScalarValue::IntervalYearMonth(_))
582                | Expr::BinaryExpr(_)
583                | Expr::Cast(Cast {
584                    data_type: DataType::Interval(_),
585                    ..
586                })
587        ) {
588            all_interval = false;
589            Ok(TreeNodeRecursion::Stop)
590        } else {
591            Ok(TreeNodeRecursion::Continue)
592        }
593    });
594
595    all_interval
596}
597
598#[cfg(test)]
599mod test {
600
601    use arrow::datatypes::IntervalUnit;
602    use catalog::memory::MemoryCatalogManager;
603    use catalog::RegisterTableRequest;
604    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
605    use common_time::IntervalYearMonth;
606    use datafusion_expr::{BinaryExpr, Operator};
607    use datatypes::prelude::ConcreteDataType;
608    use datatypes::schema::{ColumnSchema, Schema};
609    use session::context::QueryContext;
610    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
611    use table::test_util::EmptyTable;
612
613    use super::*;
614    use crate::options::QueryOptions;
615    use crate::parser::QueryLanguageParser;
616    use crate::{QueryEngineFactory, QueryEngineRef};
617
618    async fn create_test_engine() -> QueryEngineRef {
619        let table_name = "test".to_string();
620        let mut columns = vec![];
621        for i in 0..5 {
622            columns.push(ColumnSchema::new(
623                format!("tag_{i}"),
624                ConcreteDataType::string_datatype(),
625                false,
626            ));
627        }
628        columns.push(
629            ColumnSchema::new(
630                "timestamp".to_string(),
631                ConcreteDataType::timestamp_millisecond_datatype(),
632                false,
633            )
634            .with_time_index(true),
635        );
636        for i in 0..5 {
637            columns.push(ColumnSchema::new(
638                format!("field_{i}"),
639                ConcreteDataType::float64_datatype(),
640                true,
641            ));
642        }
643        let schema = Arc::new(Schema::new(columns));
644        let table_meta = TableMetaBuilder::empty()
645            .schema(schema)
646            .primary_key_indices((0..5).collect())
647            .value_indices((6..11).collect())
648            .next_column_id(1024)
649            .build()
650            .unwrap();
651        let table_info = TableInfoBuilder::default()
652            .name(&table_name)
653            .meta(table_meta)
654            .build()
655            .unwrap();
656        let table = EmptyTable::from_table_info(&table_info);
657        let catalog_list = MemoryCatalogManager::with_default_setup();
658        assert!(catalog_list
659            .register_table_sync(RegisterTableRequest {
660                catalog: DEFAULT_CATALOG_NAME.to_string(),
661                schema: DEFAULT_SCHEMA_NAME.to_string(),
662                table_name,
663                table_id: 1024,
664                table,
665            })
666            .is_ok());
667        QueryEngineFactory::new(
668            catalog_list,
669            None,
670            None,
671            None,
672            None,
673            false,
674            QueryOptions::default(),
675        )
676        .query_engine()
677    }
678
679    async fn do_query(sql: &str) -> Result<LogicalPlan> {
680        let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
681        let engine = create_test_engine().await;
682        engine.planner().plan(&stmt, QueryContext::arc()).await
683    }
684
685    async fn query_plan_compare(sql: &str, expected: String) {
686        let plan = do_query(sql).await.unwrap();
687        assert_eq!(plan.display_indent_schema().to_string(), expected);
688    }
689
690    #[tokio::test]
691    async fn range_no_project() {
692        let query = r#"SELECT timestamp, tag_0, tag_1, avg(field_0 + field_1) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
693        let expected = String::from(
694            "RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, avg(test.field_0 + test.field_1) RANGE 5m:Float64;N]\
695            \n  TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
696        );
697        query_plan_compare(query, expected).await;
698    }
699
700    #[tokio::test]
701    async fn range_expr_calculation() {
702        let query = r#"SELECT (avg(field_0 + field_1)/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
703        let expected = String::from(
704            "Projection: avg(test.field_0 + test.field_1) RANGE 5m / Int64(4) [avg(test.field_0 + test.field_1) RANGE 5m / Int64(4):Float64;N]\
705            \n  RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
706            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
707        );
708        query_plan_compare(query, expected).await;
709    }
710
711    #[tokio::test]
712    async fn range_multi_args() {
713        let query =
714            r#"SELECT (covar(field_0 + field_1, field_1)/4) RANGE '5m' FROM test ALIGN '1h';"#;
715        let expected = String::from(
716            "Projection: covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4) [covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4):Float64;N]\
717            \n  RangeSelect: range_exprs=[covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1, test.tag_2, test.tag_3, test.tag_4], time_index=timestamp [covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8]\
718            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
719        );
720        query_plan_compare(query, expected).await;
721    }
722
723    #[tokio::test]
724    async fn range_calculation() {
725        let query = r#"SELECT ((avg(field_0)+sum(field_1))/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL;"#;
726        let expected = String::from(
727            "Projection: (avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL) / Int64(4) [avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL / Int64(4):Float64;N]\
728            \n  RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL NULL, sum(test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL NULL:Float64;N, sum(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
729            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
730        );
731        query_plan_compare(query, expected).await;
732    }
733
734    #[tokio::test]
735    async fn range_as_sub_query() {
736        let query = r#"SELECT foo + 1 from (SELECT ((avg(field_0)+sum(field_1))/4) RANGE '5m' as foo FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL) where foo > 1;"#;
737        let expected = String::from(
738            "Projection: foo + Int64(1) [foo + Int64(1):Float64;N]\
739            \n  Filter: foo > Int64(1) [foo:Float64;N]\
740            \n    Projection: (avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL) / Int64(4) AS foo [foo:Float64;N]\
741            \n      RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL NULL, sum(test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL NULL:Float64;N, sum(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
742            \n        TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
743        );
744        query_plan_compare(query, expected).await;
745    }
746
747    #[tokio::test]
748    async fn range_from_nest_query() {
749        let query = r#"SELECT ((avg(a)+sum(b))/4) RANGE '5m' FROM (SELECT field_0 as a, field_1 as b, tag_0 as c, tag_1 as d, timestamp from test where field_0 > 1.0) ALIGN '1h' by (c, d) FILL NULL;"#;
750        let expected = String::from(
751            "Projection: (avg(a) RANGE 5m FILL NULL + sum(b) RANGE 5m FILL NULL) / Int64(4) [avg(a) RANGE 5m FILL NULL + sum(b) RANGE 5m FILL NULL / Int64(4):Float64;N]\
752            \n  RangeSelect: range_exprs=[avg(a) RANGE 5m FILL NULL, sum(b) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[c, d], time_index=timestamp [avg(a) RANGE 5m FILL NULL:Float64;N, sum(b) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), c:Utf8, d:Utf8]\
753            \n    Projection: test.field_0 AS a, test.field_1 AS b, test.tag_0 AS c, test.tag_1 AS d, test.timestamp [a:Float64;N, b:Float64;N, c:Utf8, d:Utf8, timestamp:Timestamp(Millisecond, None)]\
754            \n      Filter: test.field_0 > Float64(1) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]\
755            \n        TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
756         );
757        query_plan_compare(query, expected).await;
758    }
759
760    #[tokio::test]
761    async fn range_in_expr() {
762        let query = r#"SELECT sin(avg(field_0 + field_1) RANGE '5m' + 1) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
763        let expected = String::from(
764            "Projection: sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1)) [sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1)):Float64;N]\
765            \n  RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
766            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
767        );
768        query_plan_compare(query, expected).await;
769    }
770
771    #[tokio::test]
772    async fn duplicate_range_expr() {
773        let query = r#"SELECT avg(field_0) RANGE '5m' FILL 6.0 + avg(field_0) RANGE '5m' FILL 6.0 FROM test ALIGN '1h' by (tag_0,tag_1);"#;
774        let expected = String::from(
775            "Projection: avg(test.field_0) RANGE 5m FILL 6 + avg(test.field_0) RANGE 5m FILL 6 [avg(test.field_0) RANGE 5m FILL 6 + avg(test.field_0) RANGE 5m FILL 6:Float64]\
776            \n  RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL 6], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL 6:Float64, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
777            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
778        );
779        query_plan_compare(query, expected).await;
780    }
781
782    #[tokio::test]
783    async fn deep_nest_range_expr() {
784        let query = r#"SELECT round(sin(avg(field_0 + field_1) RANGE '5m' + 1)) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
785        let expected = String::from(
786            "Projection: round(sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1))) [round(sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1))):Float64;N]\
787            \n  RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
788            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
789        );
790        query_plan_compare(query, expected).await;
791    }
792
793    #[tokio::test]
794    async fn complex_range_expr() {
795        let query = r#"SELECT gcd(CAST(max(field_0 + 1) Range '5m' FILL NULL AS Int64), CAST(tag_0 AS Int64)) + round(max(field_2+1) Range '6m' FILL NULL + 1) + max(field_2+3) Range '10m' FILL NULL * CAST(tag_1 AS Float64) + 1 FROM test ALIGN '1h' by (tag_0, tag_1);"#;
796        let expected = String::from(
797            "Projection: gcd(arrow_cast(max(test.field_0 + Int64(1)) RANGE 5m FILL NULL, Utf8(\"Int64\")), arrow_cast(test.tag_0, Utf8(\"Int64\"))) + round(max(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + max(test.field_2 + Int64(3)) RANGE 10m FILL NULL * arrow_cast(test.tag_1, Utf8(\"Float64\")) + Int64(1) [gcd(arrow_cast(max(test.field_0 + Int64(1)) RANGE 5m FILL NULL,Utf8(\"Int64\")),arrow_cast(test.tag_0,Utf8(\"Int64\"))) + round(max(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + max(test.field_2 + Int64(3)) RANGE 10m FILL NULL * arrow_cast(test.tag_1,Utf8(\"Float64\")) + Int64(1):Float64;N]\
798            \n  RangeSelect: range_exprs=[max(test.field_0 + Int64(1)) RANGE 5m FILL NULL, max(test.field_2 + Int64(1)) RANGE 6m FILL NULL, max(test.field_2 + Int64(3)) RANGE 10m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [max(test.field_0 + Int64(1)) RANGE 5m FILL NULL:Float64;N, max(test.field_2 + Int64(1)) RANGE 6m FILL NULL:Float64;N, max(test.field_2 + Int64(3)) RANGE 10m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
799            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
800        );
801        query_plan_compare(query, expected).await;
802    }
803
804    #[tokio::test]
805    async fn range_linear_on_integer() {
806        let query = r#"SELECT min(CAST(field_0 AS Int64) + CAST(field_1 AS Int64)) RANGE '5m' FILL LINEAR FROM test ALIGN '1h' by (tag_0,tag_1);"#;
807        let expected = String::from(
808            "RangeSelect: range_exprs=[min(arrow_cast(test.field_0,Utf8(\"Int64\")) + arrow_cast(test.field_1,Utf8(\"Int64\"))) RANGE 5m FILL LINEAR], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [min(arrow_cast(test.field_0,Utf8(\"Int64\")) + arrow_cast(test.field_1,Utf8(\"Int64\"))) RANGE 5m FILL LINEAR:Float64;N]\
809            \n  TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
810        );
811        query_plan_compare(query, expected).await;
812    }
813
814    #[tokio::test]
815    async fn range_nest_range_err() {
816        let query = r#"SELECT sum(avg(field_0 + field_1) RANGE '5m' + 1) RANGE '5m' + 1 FROM test ALIGN '1h' by (tag_0,tag_1);"#;
817        assert_eq!(
818            do_query(query).await.unwrap_err().to_string(),
819            "Range Query: Nest Range Query is not allowed"
820        )
821    }
822
823    #[tokio::test]
824    /// Start directly from the rewritten SQL and check whether the error reported by the range expression rewriting is as expected.
825    /// the right argument is `range_fn(avg(field_0), '5m', 'NULL', '0', '1h')`
826    async fn range_argument_err_1() {
827        let query = r#"SELECT range_fn('5m', avg(field_0), 'NULL', '1', tag_0, '1h') FROM test group by tag_0;"#;
828        let error = do_query(query).await.unwrap_err().to_string();
829        assert_eq!(
830            error,
831            "Error during planning: Illegal argument `Utf8(\"5m\")` in range select query"
832        )
833    }
834
835    #[tokio::test]
836    async fn range_argument_err_2() {
837        let query = r#"SELECT range_fn(avg(field_0), 5, 'NULL', '1', tag_0, '1h') FROM test group by tag_0;"#;
838        let error = do_query(query).await.unwrap_err().to_string();
839        assert_eq!(
840            error,
841            "Error during planning: Illegal argument `Int64(5)` in range select query"
842        )
843    }
844
845    #[test]
846    fn test_parse_duration_expr() {
847        // test IntervalYearMonth
848        let interval = IntervalYearMonth::new(10);
849        let args = vec![Expr::Literal(ScalarValue::IntervalYearMonth(Some(
850            interval.to_i32(),
851        )))];
852        assert!(parse_duration_expr(&args, 0).is_err(),);
853        // test IntervalDayTime
854        let interval = IntervalDayTime::new(10, 10);
855        let args = vec![Expr::Literal(ScalarValue::IntervalDayTime(Some(
856            interval.into(),
857        )))];
858        assert_eq!(
859            parse_duration_expr(&args, 0).unwrap().as_millis() as i64,
860            interval.as_millis()
861        );
862        // test IntervalMonthDayNano
863        let interval = IntervalMonthDayNano::new(0, 10, 10);
864        let args = vec![Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(
865            interval.into(),
866        )))];
867        assert_eq!(
868            parse_duration_expr(&args, 0).unwrap().as_millis() as i64,
869            interval.days as i64 * MS_PER_DAY + interval.nanoseconds / NANOS_PER_MILLI,
870        );
871        // test Duration
872        let args = vec![Expr::Literal(ScalarValue::Utf8(Some("1y4w".into())))];
873        assert_eq!(
874            parse_duration_expr(&args, 0).unwrap(),
875            parse_duration("1y4w").unwrap()
876        );
877        // test cast expression
878        let args = vec![Expr::Cast(Cast {
879            expr: Box::new(Expr::Literal(ScalarValue::Utf8(Some("15 minutes".into())))),
880            data_type: DataType::Interval(IntervalUnit::MonthDayNano),
881        })];
882        assert_eq!(
883            parse_duration_expr(&args, 0).unwrap(),
884            parse_duration("15m").unwrap()
885        );
886        // test index err
887        assert!(parse_duration_expr(&args, 10).is_err());
888        // test evaluate expr
889        let args = vec![Expr::BinaryExpr(BinaryExpr {
890            left: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
891                IntervalDayTime::new(0, 10).into(),
892            )))),
893            op: Operator::Plus,
894            right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
895                IntervalDayTime::new(0, 10).into(),
896            )))),
897        })];
898        assert_eq!(
899            parse_duration_expr(&args, 0).unwrap(),
900            Duration::from_millis(20)
901        );
902        let args = vec![Expr::BinaryExpr(BinaryExpr {
903            left: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
904                IntervalDayTime::new(0, 10).into(),
905            )))),
906            op: Operator::Minus,
907            right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
908                IntervalDayTime::new(0, 10).into(),
909            )))),
910        })];
911        // test zero interval error
912        assert!(parse_duration_expr(&args, 0).is_err());
913        // test must all be interval
914        let args = vec![Expr::BinaryExpr(BinaryExpr {
915            left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
916                IntervalYearMonth::new(10).to_i32(),
917            )))),
918            op: Operator::Minus,
919            right: Box::new(Expr::Literal(ScalarValue::Time64Microsecond(Some(0)))),
920        })];
921        assert!(parse_duration_expr(&args, 0).is_err());
922    }
923
924    #[test]
925    fn test_parse_align_to() {
926        // test NOW
927        let args = vec![Expr::Literal(ScalarValue::Utf8(Some("NOW".into())))];
928        let epsinon = parse_align_to(&args, 0, None).unwrap() - Timestamp::current_millis().value();
929        assert!(epsinon.abs() < 100);
930        // test default
931        let args = vec![Expr::Literal(ScalarValue::Utf8(Some("".into())))];
932        assert_eq!(0, parse_align_to(&args, 0, None).unwrap());
933        // test default with timezone
934        let args = vec![Expr::Literal(ScalarValue::Utf8(Some("".into())))];
935        assert_eq!(
936            -36000 * 1000,
937            parse_align_to(&args, 0, Some(&Timezone::from_tz_string("HST").unwrap())).unwrap()
938        );
939        assert_eq!(
940            28800 * 1000,
941            parse_align_to(
942                &args,
943                0,
944                Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
945            )
946            .unwrap()
947        );
948
949        // test Timestamp
950        let args = vec![Expr::Literal(ScalarValue::Utf8(Some(
951            "1970-01-01T00:00:00+08:00".into(),
952        )))];
953        assert_eq!(parse_align_to(&args, 0, None).unwrap(), -8 * 60 * 60 * 1000);
954        // timezone
955        let args = vec![Expr::Literal(ScalarValue::Utf8(Some(
956            "1970-01-01T00:00:00".into(),
957        )))];
958        assert_eq!(
959            parse_align_to(
960                &args,
961                0,
962                Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
963            )
964            .unwrap(),
965            -8 * 60 * 60 * 1000
966        );
967        // test evaluate expr
968        let args = vec![Expr::BinaryExpr(BinaryExpr {
969            left: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
970                IntervalDayTime::new(0, 10).into(),
971            )))),
972            op: Operator::Plus,
973            right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
974                IntervalDayTime::new(0, 10).into(),
975            )))),
976        })];
977        assert_eq!(parse_align_to(&args, 0, None).unwrap(), 20);
978    }
979
980    #[test]
981    fn test_interval_only() {
982        let expr = Expr::BinaryExpr(BinaryExpr {
983            left: Box::new(Expr::Literal(ScalarValue::DurationMillisecond(Some(20)))),
984            op: Operator::Minus,
985            right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
986                IntervalDayTime::new(10, 0).into(),
987            )))),
988        });
989        assert!(!interval_only_in_expr(&expr));
990        let expr = Expr::BinaryExpr(BinaryExpr {
991            left: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
992                IntervalDayTime::new(10, 0).into(),
993            )))),
994            op: Operator::Minus,
995            right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
996                IntervalDayTime::new(10, 0).into(),
997            )))),
998        });
999        assert!(interval_only_in_expr(&expr));
1000
1001        let expr = Expr::BinaryExpr(BinaryExpr {
1002            left: Box::new(Expr::Cast(Cast {
1003                expr: Box::new(Expr::Literal(ScalarValue::Utf8(Some(
1004                    "15 minute".to_string(),
1005                )))),
1006                data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1007            })),
1008            op: Operator::Minus,
1009            right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
1010                IntervalDayTime::new(10, 0).into(),
1011            )))),
1012        });
1013        assert!(interval_only_in_expr(&expr));
1014
1015        let expr = Expr::Cast(Cast {
1016            expr: Box::new(Expr::BinaryExpr(BinaryExpr {
1017                left: Box::new(Expr::Cast(Cast {
1018                    expr: Box::new(Expr::Literal(ScalarValue::Utf8(Some(
1019                        "15 minute".to_string(),
1020                    )))),
1021                    data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1022                })),
1023                op: Operator::Minus,
1024                right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
1025                    IntervalDayTime::new(10, 0).into(),
1026                )))),
1027            })),
1028            data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1029        });
1030
1031        assert!(interval_only_in_expr(&expr));
1032    }
1033}