1use std::collections::BTreeSet;
16use std::sync::Arc;
17use std::time::Duration;
18
19use arrow_schema::DataType;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use chrono::Utc;
23use common_time::interval::{MS_PER_DAY, NANOS_PER_MILLI};
24use common_time::timestamp::TimeUnit;
25use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp, Timezone};
26use datafusion::datasource::DefaultTableSource;
27use datafusion::prelude::Column;
28use datafusion::scalar::ScalarValue;
29use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
30use datafusion_common::{DFSchema, DataFusionError, Result as DFResult};
31use datafusion_expr::execution_props::ExecutionProps;
32use datafusion_expr::expr::WildcardOptions;
33use datafusion_expr::simplify::SimplifyContext;
34use datafusion_expr::{
35    Aggregate, Analyze, Cast, Distinct, DistinctOn, Explain, Expr, ExprSchemable, Extension,
36    Literal, LogicalPlan, LogicalPlanBuilder, Projection,
37};
38use datafusion_optimizer::simplify_expressions::ExprSimplifier;
39use datatypes::prelude::ConcreteDataType;
40use promql_parser::util::parse_duration;
41use session::context::QueryContextRef;
42use snafu::{OptionExt, ResultExt, ensure};
43use table::table::adapter::DfTableProviderAdapter;
44
45use crate::error::{
46    CatalogSnafu, RangeQuerySnafu, Result, TimeIndexNotFoundSnafu, UnknownTableSnafu,
47};
48use crate::plan::ExtractExpr;
49use crate::range_select::plan::{Fill, RangeFn, RangeSelect};
50
51pub struct RangeExprRewriter<'a> {
55    input_plan: &'a Arc<LogicalPlan>,
56    align: Duration,
57    align_to: i64,
58    by: Vec<Expr>,
59    range_fn: BTreeSet<RangeFn>,
61    sub_aggr: &'a Aggregate,
62    query_ctx: &'a QueryContextRef,
63}
64
65impl RangeExprRewriter<'_> {
66    pub fn get_range_expr(&self, args: &[Expr], i: usize) -> DFResult<Expr> {
67        match args.get(i) {
68            Some(Expr::Column(column)) => {
69                let index = self.sub_aggr.schema.index_of_column(column)?;
70                let len = self.sub_aggr.group_expr.len();
71                self.sub_aggr
72                    .aggr_expr
73                    .get(index - len)
74                    .cloned()
75                    .ok_or(DataFusionError::Plan(
76                        "Range expr not found in underlying Aggregate Plan".into(),
77                    ))
78            }
79            Some(Expr::Alias(alias)) => {
80                self.get_range_expr(std::slice::from_ref(alias.expr.as_ref()), 0)
81            }
82            other => Err(dispose_parse_error(other)),
83        }
84    }
85}
86
87#[inline]
88fn dispose_parse_error(expr: Option<&Expr>) -> DataFusionError {
89    DataFusionError::Plan(
90        expr.map(|x| {
91            format!(
92                "Illegal argument `{}` in range select query",
93                x.schema_name()
94            )
95        })
96        .unwrap_or("Missing argument in range select query".into()),
97    )
98}
99
100fn parse_str_expr(args: &[Expr], i: usize) -> DFResult<&str> {
101    match args.get(i) {
102        Some(Expr::Literal(ScalarValue::Utf8(Some(str)), _)) => Ok(str.as_str()),
103        other => Err(dispose_parse_error(other)),
104    }
105}
106
107fn parse_expr_to_string(args: &[Expr], i: usize) -> DFResult<String> {
108    match args.get(i) {
109        Some(Expr::Literal(ScalarValue::Utf8(Some(str)), _)) => Ok(str.clone()),
110        Some(expr) => Ok(expr.schema_name().to_string()),
111        None => Err(dispose_parse_error(None)),
112    }
113}
114
115fn parse_duration_expr(args: &[Expr], i: usize) -> DFResult<Duration> {
120    match args.get(i) {
121        Some(Expr::Literal(ScalarValue::Utf8(Some(str)), _)) => {
122            parse_duration(str).map_err(DataFusionError::Plan)
123        }
124        Some(expr) => {
125            let ms = evaluate_expr_to_millisecond(args, i, true)?;
126            if ms <= 0 {
127                return Err(dispose_parse_error(Some(expr)));
128            }
129            Ok(Duration::from_millis(ms as u64))
130        }
131        None => Err(dispose_parse_error(None)),
132    }
133}
134
135fn evaluate_expr_to_millisecond(args: &[Expr], i: usize, interval_only: bool) -> DFResult<i64> {
143    let Some(expr) = args.get(i) else {
144        return Err(dispose_parse_error(None));
145    };
146    if interval_only && !interval_only_in_expr(expr) {
147        return Err(dispose_parse_error(Some(expr)));
148    }
149    let execution_props = ExecutionProps::new().with_query_execution_start_time(Utc::now());
150    let info = SimplifyContext::new(&execution_props).with_schema(Arc::new(DFSchema::empty()));
151    let simplify_expr = ExprSimplifier::new(info).simplify(expr.clone())?;
152    match simplify_expr {
153        Expr::Literal(ScalarValue::TimestampNanosecond(ts_nanos, _), _)
154        | Expr::Literal(ScalarValue::DurationNanosecond(ts_nanos), _) => {
155            ts_nanos.map(|v| v / 1_000_000)
156        }
157        Expr::Literal(ScalarValue::TimestampMicrosecond(ts_micros, _), _)
158        | Expr::Literal(ScalarValue::DurationMicrosecond(ts_micros), _) => {
159            ts_micros.map(|v| v / 1_000)
160        }
161        Expr::Literal(ScalarValue::TimestampMillisecond(ts_millis, _), _)
162        | Expr::Literal(ScalarValue::DurationMillisecond(ts_millis), _) => ts_millis,
163        Expr::Literal(ScalarValue::TimestampSecond(ts_secs, _), _)
164        | Expr::Literal(ScalarValue::DurationSecond(ts_secs), _) => ts_secs.map(|v| v * 1_000),
165        Expr::Literal(ScalarValue::IntervalYearMonth(interval), _) => interval
167            .map(|v| {
168                let interval = IntervalYearMonth::from_i32(v);
169                if interval.months != 0 {
170                    return Err(DataFusionError::Plan(format!(
171                        "Year or month interval is not allowed in range query: {}",
172                        expr.schema_name()
173                    )));
174                }
175
176                Ok(0)
177            })
178            .transpose()?,
179        Expr::Literal(ScalarValue::IntervalDayTime(interval), _) => interval.map(|v| {
180            let interval = IntervalDayTime::from(v);
181            interval.as_millis()
182        }),
183        Expr::Literal(ScalarValue::IntervalMonthDayNano(interval), _) => interval
184            .map(|v| {
185                let interval = IntervalMonthDayNano::from(v);
186                if interval.months != 0 {
187                    return Err(DataFusionError::Plan(format!(
188                        "Year or month interval is not allowed in range query: {}",
189                        expr.schema_name()
190                    )));
191                }
192
193                Ok(interval.days as i64 * MS_PER_DAY + interval.nanoseconds / NANOS_PER_MILLI)
194            })
195            .transpose()?,
196        _ => None,
197    }
198    .ok_or_else(|| {
199        DataFusionError::Plan(format!(
200            "{} is not a expr can be evaluate and use in range query",
201            expr.schema_name()
202        ))
203    })
204}
205
206fn parse_align_to(args: &[Expr], i: usize, timezone: Option<&Timezone>) -> DFResult<i64> {
213    let Ok(s) = parse_str_expr(args, i) else {
214        return evaluate_expr_to_millisecond(args, i, false);
215    };
216    let upper = s.to_uppercase();
217    match upper.as_str() {
218        "NOW" => return Ok(Timestamp::current_millis().value()),
219        "" => return Ok(timezone.map(|tz| tz.local_minus_utc() * 1000).unwrap_or(0)),
221        _ => (),
222    }
223
224    Timestamp::from_str(s, timezone)
225        .map_err(|e| {
226            DataFusionError::Plan(format!(
227                "Illegal `align to` argument `{}` in range select query, can't be parse as NOW/CALENDAR/Timestamp, error: {}",
228                s, e
229            ))
230        })?.convert_to(TimeUnit::Millisecond).map(|x|x.value()).ok_or(DataFusionError::Plan(format!(
231            "Illegal `align to` argument `{}` in range select query, can't be convert to a valid Timestamp",
232            s
233        ))
234        )
235}
236
237fn parse_expr_list(args: &[Expr], start: usize, len: usize) -> DFResult<Vec<Expr>> {
238    let mut outs = Vec::with_capacity(len);
239    for i in start..start + len {
240        outs.push(match &args.get(i) {
241            Some(
242                Expr::Column(_)
243                | Expr::Literal(_, _)
244                | Expr::BinaryExpr(_)
245                | Expr::ScalarFunction(_),
246            ) => args[i].clone(),
247            Some(Expr::Alias(alias)) if matches!(*alias.expr, Expr::ScalarFunction(_)) => {
248                args[i].clone()
249            }
250            other => {
251                return Err(dispose_parse_error(*other));
252            }
253        });
254    }
255    Ok(outs)
256}
257
258macro_rules! inconsistent_check {
259    ($self: ident.$name: ident, $cond: expr) => {
260        if $cond && $self.$name != $name {
261            return Err(DataFusionError::Plan(
262                concat!(
263                    "Inconsistent ",
264                    stringify!($name),
265                    " given in Range Function Rewrite"
266                )
267                .into(),
268            ));
269        } else {
270            $self.$name = $name;
271        }
272    };
273}
274
275impl TreeNodeRewriter for RangeExprRewriter<'_> {
276    type Node = Expr;
277
278    fn f_down(&mut self, node: Expr) -> DFResult<Transformed<Expr>> {
279        if let Expr::ScalarFunction(func) = &node
280            && func.name() == "range_fn"
281        {
282            let range_expr = self.get_range_expr(&func.args, 0)?;
285            let range = parse_duration_expr(&func.args, 1)?;
286            let byc = str::parse::<usize>(parse_str_expr(&func.args, 3)?)
287                .map_err(|e| DataFusionError::Plan(e.to_string()))?;
288            let by = parse_expr_list(&func.args, 4, byc)?;
289            let align = parse_duration_expr(&func.args, byc + 4)?;
290            let align_to = parse_align_to(&func.args, byc + 5, Some(&self.query_ctx.timezone()))?;
291            let mut data_type = range_expr.get_type(self.input_plan.schema())?;
292            let mut need_cast = false;
293            let fill = Fill::try_from_str(parse_str_expr(&func.args, 2)?, &data_type)?;
294            if matches!(fill, Some(Fill::Linear)) && data_type.is_integer() {
295                data_type = DataType::Float64;
296                need_cast = true;
297            }
298            inconsistent_check!(self.by, !self.by.is_empty());
299            inconsistent_check!(self.align, self.align != Duration::default());
300            inconsistent_check!(self.align_to, self.align_to != 0);
301            let range_fn = RangeFn {
302                name: if let Some(fill) = &fill {
303                    format!(
304                        "{} RANGE {} FILL {}",
305                        range_expr.schema_name(),
306                        parse_expr_to_string(&func.args, 1)?,
307                        fill
308                    )
309                } else {
310                    format!(
311                        "{} RANGE {}",
312                        range_expr.schema_name(),
313                        parse_expr_to_string(&func.args, 1)?,
314                    )
315                },
316                data_type,
317                expr: range_expr,
318                range,
319                fill,
320                need_cast,
321            };
322            let alias = Expr::Column(Column::from_name(range_fn.name.clone()));
323            self.range_fn.insert(range_fn);
324            return Ok(Transformed::yes(alias));
325        }
326        Ok(Transformed::no(node))
327    }
328}
329
330pub struct RangePlanRewriter {
337    table_provider: DfTableSourceProvider,
338    query_ctx: QueryContextRef,
339}
340
341impl RangePlanRewriter {
342    pub fn new(table_provider: DfTableSourceProvider, query_ctx: QueryContextRef) -> Self {
343        Self {
344            table_provider,
345            query_ctx,
346        }
347    }
348
349    pub async fn rewrite(&mut self, plan: LogicalPlan) -> Result<LogicalPlan> {
350        match self.rewrite_logical_plan(&plan).await? {
351            Some(new_plan) => Ok(new_plan),
352            None => Ok(plan),
353        }
354    }
355
356    #[async_recursion]
357    async fn rewrite_logical_plan(&mut self, plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
358        let inputs = plan.inputs();
359        let mut new_inputs = Vec::with_capacity(inputs.len());
360        for input in &inputs {
361            new_inputs.push(self.rewrite_logical_plan(input).await?)
362        }
363        match plan {
364            LogicalPlan::Projection(Projection { expr, input, .. })
365                if have_range_in_exprs(expr) =>
366            {
367                let (aggr_plan, input) = if let LogicalPlan::Aggregate(aggr) = input.as_ref() {
368                    if have_range_in_exprs(&aggr.aggr_expr) {
370                        return RangeQuerySnafu {
371                            msg: "Nest Range Query is not allowed",
372                        }
373                        .fail();
374                    }
375                    (aggr, aggr.input.clone())
376                } else {
377                    return RangeQuerySnafu {
378                        msg: "Window functions is not allowed in Range Query",
379                    }
380                    .fail();
381                };
382                let (time_index, default_by) = self.get_index_by(input.schema()).await?;
383                let mut range_rewriter = RangeExprRewriter {
384                    input_plan: &input,
385                    align: Duration::default(),
386                    align_to: 0,
387                    by: vec![],
388                    range_fn: BTreeSet::new(),
389                    sub_aggr: aggr_plan,
390                    query_ctx: &self.query_ctx,
391                };
392                let new_expr = expr
393                    .iter()
394                    .map(|expr| expr.clone().rewrite(&mut range_rewriter).map(|x| x.data))
395                    .collect::<DFResult<Vec<_>>>()?;
396                if range_rewriter.by.is_empty() {
397                    range_rewriter.by = default_by;
398                }
399                let range_select = RangeSelect::try_new(
400                    input.clone(),
401                    range_rewriter.range_fn.into_iter().collect(),
402                    range_rewriter.align,
403                    range_rewriter.align_to,
404                    time_index,
405                    range_rewriter.by,
406                    &new_expr,
407                )?;
408                let no_additional_project = range_select.schema_project.is_some();
409                let range_plan = LogicalPlan::Extension(Extension {
410                    node: Arc::new(range_select),
411                });
412                if no_additional_project {
413                    Ok(Some(range_plan))
414                } else {
415                    let project_plan = LogicalPlanBuilder::from(range_plan)
416                        .project(new_expr)
417                        .and_then(|x| x.build())?;
418                    Ok(Some(project_plan))
419                }
420            }
421            _ => {
422                if new_inputs.iter().any(|x| x.is_some()) {
423                    let inputs: Vec<LogicalPlan> = new_inputs
424                        .into_iter()
425                        .zip(inputs)
426                        .map(|(x, y)| match x {
427                            Some(plan) => plan,
428                            None => y.clone(),
429                        })
430                        .collect();
431                    let plan = match plan {
435                        LogicalPlan::Analyze(Analyze { verbose, .. }) => {
436                            ensure!(
437                                inputs.len() == 1,
438                                RangeQuerySnafu {
439                                    msg: "Illegal subplan nums when rewrite Analyze logical plan",
440                                }
441                            );
442                            LogicalPlanBuilder::from(inputs[0].clone())
443                                .explain(*verbose, true)?
444                                .build()
445                        }
446                        LogicalPlan::Explain(Explain { verbose, .. }) => {
447                            ensure!(
448                                inputs.len() == 1,
449                                RangeQuerySnafu {
450                                    msg: "Illegal subplan nums when rewrite Explain logical plan",
451                                }
452                            );
453                            LogicalPlanBuilder::from(inputs[0].clone())
454                                .explain(*verbose, false)?
455                                .build()
456                        }
457                        LogicalPlan::Distinct(Distinct::On(DistinctOn {
458                            on_expr,
459                            select_expr,
460                            sort_expr,
461                            ..
462                        })) => {
463                            ensure!(
464                                inputs.len() == 1,
465                                RangeQuerySnafu {
466                                    msg: "Illegal subplan nums when rewrite DistinctOn logical plan",
467                                }
468                            );
469                            LogicalPlanBuilder::from(inputs[0].clone())
470                                .distinct_on(
471                                    on_expr.clone(),
472                                    select_expr.clone(),
473                                    sort_expr.clone(),
474                                )?
475                                .build()
476                        }
477                        _ => plan.with_new_exprs(plan.expressions_consider_join(), inputs),
478                    }?;
479                    Ok(Some(plan))
480                } else {
481                    Ok(None)
482                }
483            }
484        }
485    }
486
487    async fn get_index_by(&mut self, schema: &Arc<DFSchema>) -> Result<(Expr, Vec<Expr>)> {
492        #[allow(deprecated)]
493        let mut time_index_expr = Expr::Wildcard {
494            qualifier: None,
495            options: Box::new(WildcardOptions::default()),
496        };
497        let mut default_by = vec![];
498        for i in 0..schema.fields().len() {
499            let (qualifier, _) = schema.qualified_field(i);
500            if let Some(table_ref) = qualifier {
501                let table = self
502                    .table_provider
503                    .resolve_table(table_ref.clone())
504                    .await
505                    .context(CatalogSnafu)?
506                    .as_any()
507                    .downcast_ref::<DefaultTableSource>()
508                    .context(UnknownTableSnafu)?
509                    .table_provider
510                    .as_any()
511                    .downcast_ref::<DfTableProviderAdapter>()
512                    .context(UnknownTableSnafu)?
513                    .table();
514                let schema = table.schema();
515                let time_index_column =
516                    schema
517                        .timestamp_column()
518                        .with_context(|| TimeIndexNotFoundSnafu {
519                            table: table_ref.to_string(),
520                        })?;
521                if let ConcreteDataType::Timestamp(_) = time_index_column.data_type {
523                    default_by = table
524                        .table_info()
525                        .meta
526                        .row_key_column_names()
527                        .map(|key| Expr::Column(Column::new(Some(table_ref.clone()), key)))
528                        .collect();
529                    if default_by.is_empty() {
533                        default_by = vec![1.lit()];
534                    }
535                    time_index_expr = Expr::Column(Column::new(
536                        Some(table_ref.clone()),
537                        time_index_column.name.clone(),
538                    ));
539                }
540            }
541        }
542        #[allow(deprecated)]
543        if matches!(time_index_expr, Expr::Wildcard { .. }) {
544            TimeIndexNotFoundSnafu {
545                table: schema.to_string(),
546            }
547            .fail()
548        } else {
549            Ok((time_index_expr, default_by))
550        }
551    }
552}
553
554fn have_range_in_exprs(exprs: &[Expr]) -> bool {
555    exprs.iter().any(|expr| {
556        let mut find_range = false;
557        let _ = expr.apply(|expr| {
558            Ok(match expr {
559                Expr::ScalarFunction(func) if func.name() == "range_fn" => {
560                    find_range = true;
561                    TreeNodeRecursion::Stop
562                }
563                _ => TreeNodeRecursion::Continue,
564            })
565        });
566        find_range
567    })
568}
569
570fn interval_only_in_expr(expr: &Expr) -> bool {
571    let mut all_interval = true;
572    let _ = expr.apply(|expr| {
573        if matches!(
575            expr,
576            Expr::Cast(Cast{
577                expr,
578                data_type: DataType::Interval(_)
579            }) if matches!(&**expr, Expr::Literal(ScalarValue::Utf8(_), _))
580        ) {
581            return Ok(TreeNodeRecursion::Stop);
584        }
585
586        if !matches!(
587            expr,
588            Expr::Literal(ScalarValue::IntervalDayTime(_), _)
589                | Expr::Literal(ScalarValue::IntervalMonthDayNano(_), _)
590                | Expr::Literal(ScalarValue::IntervalYearMonth(_), _)
591                | Expr::BinaryExpr(_)
592                | Expr::Cast(Cast {
593                    data_type: DataType::Interval(_),
594                    ..
595                })
596        ) {
597            all_interval = false;
598            Ok(TreeNodeRecursion::Stop)
599        } else {
600            Ok(TreeNodeRecursion::Continue)
601        }
602    });
603
604    all_interval
605}
606
607#[cfg(test)]
608mod test {
609
610    use arrow::datatypes::IntervalUnit;
611    use catalog::RegisterTableRequest;
612    use catalog::memory::MemoryCatalogManager;
613    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
614    use common_time::IntervalYearMonth;
615    use datafusion_expr::{BinaryExpr, Literal, Operator};
616    use datatypes::prelude::ConcreteDataType;
617    use datatypes::schema::{ColumnSchema, Schema};
618    use session::context::QueryContext;
619    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
620    use table::test_util::EmptyTable;
621
622    use super::*;
623    use crate::options::QueryOptions;
624    use crate::parser::QueryLanguageParser;
625    use crate::{QueryEngineFactory, QueryEngineRef};
626
627    async fn create_test_engine() -> QueryEngineRef {
628        let table_name = "test".to_string();
629        let mut columns = vec![];
630        for i in 0..5 {
631            columns.push(ColumnSchema::new(
632                format!("tag_{i}"),
633                ConcreteDataType::string_datatype(),
634                false,
635            ));
636        }
637        columns.push(
638            ColumnSchema::new(
639                "timestamp".to_string(),
640                ConcreteDataType::timestamp_millisecond_datatype(),
641                false,
642            )
643            .with_time_index(true),
644        );
645        for i in 0..5 {
646            columns.push(ColumnSchema::new(
647                format!("field_{i}"),
648                ConcreteDataType::float64_datatype(),
649                true,
650            ));
651        }
652        let schema = Arc::new(Schema::new(columns));
653        let table_meta = TableMetaBuilder::empty()
654            .schema(schema)
655            .primary_key_indices((0..5).collect())
656            .value_indices((6..11).collect())
657            .next_column_id(1024)
658            .build()
659            .unwrap();
660        let table_info = TableInfoBuilder::default()
661            .name(&table_name)
662            .meta(table_meta)
663            .build()
664            .unwrap();
665        let table = EmptyTable::from_table_info(&table_info);
666        let catalog_list = MemoryCatalogManager::with_default_setup();
667        assert!(
668            catalog_list
669                .register_table_sync(RegisterTableRequest {
670                    catalog: DEFAULT_CATALOG_NAME.to_string(),
671                    schema: DEFAULT_SCHEMA_NAME.to_string(),
672                    table_name,
673                    table_id: 1024,
674                    table,
675                })
676                .is_ok()
677        );
678        QueryEngineFactory::new(
679            catalog_list,
680            None,
681            None,
682            None,
683            None,
684            false,
685            QueryOptions::default(),
686        )
687        .query_engine()
688    }
689
690    async fn do_query(sql: &str) -> Result<LogicalPlan> {
691        let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
692        let engine = create_test_engine().await;
693        engine.planner().plan(&stmt, QueryContext::arc()).await
694    }
695
696    async fn query_plan_compare(sql: &str, expected: String) {
697        let plan = do_query(sql).await.unwrap();
698        assert_eq!(plan.display_indent_schema().to_string(), expected);
699    }
700
701    #[tokio::test]
702    async fn range_no_project() {
703        let query = r#"SELECT timestamp, tag_0, tag_1, avg(field_0 + field_1) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
704        let expected = String::from(
705            "RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, avg(test.field_0 + test.field_1) RANGE 5m:Float64;N]\
706            \n  TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
707        );
708        query_plan_compare(query, expected).await;
709    }
710
711    #[tokio::test]
712    async fn range_expr_calculation() {
713        let query = r#"SELECT (avg(field_0 + field_1)/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
714        let expected = String::from(
715            "Projection: avg(test.field_0 + test.field_1) RANGE 5m / Int64(4) [avg(test.field_0 + test.field_1) RANGE 5m / Int64(4):Float64;N]\
716            \n  RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
717            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
718        );
719        query_plan_compare(query, expected).await;
720    }
721
722    #[tokio::test]
723    async fn range_multi_args() {
724        let query =
725            r#"SELECT (covar(field_0 + field_1, field_1)/4) RANGE '5m' FROM test ALIGN '1h';"#;
726        let expected = String::from(
727            "Projection: covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4) [covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4):Float64;N]\
728            \n  RangeSelect: range_exprs=[covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1, test.tag_2, test.tag_3, test.tag_4], time_index=timestamp [covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8]\
729            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
730        );
731        query_plan_compare(query, expected).await;
732    }
733
734    #[tokio::test]
735    async fn range_calculation() {
736        let query = r#"SELECT ((avg(field_0)+sum(field_1))/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL;"#;
737        let expected = String::from(
738            "Projection: (avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL) / Int64(4) [avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL / Int64(4):Float64;N]\
739            \n  RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL NULL, sum(test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL NULL:Float64;N, sum(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
740            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
741        );
742        query_plan_compare(query, expected).await;
743    }
744
745    #[tokio::test]
746    async fn range_as_sub_query() {
747        let query = r#"SELECT foo + 1 from (SELECT ((avg(field_0)+sum(field_1))/4) RANGE '5m' as foo FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL) where foo > 1;"#;
748        let expected = String::from(
749            "Projection: foo + Int64(1) [foo + Int64(1):Float64;N]\
750            \n  Filter: foo > Int64(1) [foo:Float64;N]\
751            \n    Projection: (avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL) / Int64(4) AS foo [foo:Float64;N]\
752            \n      RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL NULL, sum(test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL NULL:Float64;N, sum(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
753            \n        TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
754        );
755        query_plan_compare(query, expected).await;
756    }
757
758    #[tokio::test]
759    async fn range_from_nest_query() {
760        let query = r#"SELECT ((avg(a)+sum(b))/4) RANGE '5m' FROM (SELECT field_0 as a, field_1 as b, tag_0 as c, tag_1 as d, timestamp from test where field_0 > 1.0) ALIGN '1h' by (c, d) FILL NULL;"#;
761        let expected = String::from(
762            "Projection: (avg(a) RANGE 5m FILL NULL + sum(b) RANGE 5m FILL NULL) / Int64(4) [avg(a) RANGE 5m FILL NULL + sum(b) RANGE 5m FILL NULL / Int64(4):Float64;N]\
763            \n  RangeSelect: range_exprs=[avg(a) RANGE 5m FILL NULL, sum(b) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[c, d], time_index=timestamp [avg(a) RANGE 5m FILL NULL:Float64;N, sum(b) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), c:Utf8, d:Utf8]\
764            \n    Projection: test.field_0 AS a, test.field_1 AS b, test.tag_0 AS c, test.tag_1 AS d, test.timestamp [a:Float64;N, b:Float64;N, c:Utf8, d:Utf8, timestamp:Timestamp(Millisecond, None)]\
765            \n      Filter: test.field_0 > Float64(1) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]\
766            \n        TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
767        );
768        query_plan_compare(query, expected).await;
769    }
770
771    #[tokio::test]
772    async fn range_in_expr() {
773        let query = r#"SELECT sin(avg(field_0 + field_1) RANGE '5m' + 1) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
774        let expected = String::from(
775            "Projection: sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1)) [sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1)):Float64;N]\
776            \n  RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
777            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
778        );
779        query_plan_compare(query, expected).await;
780    }
781
782    #[tokio::test]
783    async fn duplicate_range_expr() {
784        let query = r#"SELECT avg(field_0) RANGE '5m' FILL 6.0 + avg(field_0) RANGE '5m' FILL 6.0 FROM test ALIGN '1h' by (tag_0,tag_1);"#;
785        let expected = String::from(
786            "Projection: avg(test.field_0) RANGE 5m FILL 6 + avg(test.field_0) RANGE 5m FILL 6 [avg(test.field_0) RANGE 5m FILL 6 + avg(test.field_0) RANGE 5m FILL 6:Float64]\
787            \n  RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL 6], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL 6:Float64, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
788            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
789        );
790        query_plan_compare(query, expected).await;
791    }
792
793    #[tokio::test]
794    async fn deep_nest_range_expr() {
795        let query = r#"SELECT round(sin(avg(field_0 + field_1) RANGE '5m' + 1)) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
796        let expected = String::from(
797            "Projection: round(sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1))) [round(sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1))):Float64;N]\
798            \n  RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
799            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
800        );
801        query_plan_compare(query, expected).await;
802    }
803
804    #[tokio::test]
805    async fn complex_range_expr() {
806        let query = r#"SELECT gcd(CAST(max(field_0 + 1) Range '5m' FILL NULL AS Int64), CAST(tag_0 AS Int64)) + round(max(field_2+1) Range '6m' FILL NULL + 1) + max(field_2+3) Range '10m' FILL NULL * CAST(tag_1 AS Float64) + 1 FROM test ALIGN '1h' by (tag_0, tag_1);"#;
807        let expected = String::from(
808            "Projection: gcd(arrow_cast(max(test.field_0 + Int64(1)) RANGE 5m FILL NULL, Utf8(\"Int64\")), arrow_cast(test.tag_0, Utf8(\"Int64\"))) + round(max(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + max(test.field_2 + Int64(3)) RANGE 10m FILL NULL * arrow_cast(test.tag_1, Utf8(\"Float64\")) + Int64(1) [gcd(arrow_cast(max(test.field_0 + Int64(1)) RANGE 5m FILL NULL,Utf8(\"Int64\")),arrow_cast(test.tag_0,Utf8(\"Int64\"))) + round(max(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + max(test.field_2 + Int64(3)) RANGE 10m FILL NULL * arrow_cast(test.tag_1,Utf8(\"Float64\")) + Int64(1):Float64;N]\
809            \n  RangeSelect: range_exprs=[max(test.field_0 + Int64(1)) RANGE 5m FILL NULL, max(test.field_2 + Int64(1)) RANGE 6m FILL NULL, max(test.field_2 + Int64(3)) RANGE 10m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [max(test.field_0 + Int64(1)) RANGE 5m FILL NULL:Float64;N, max(test.field_2 + Int64(1)) RANGE 6m FILL NULL:Float64;N, max(test.field_2 + Int64(3)) RANGE 10m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
810            \n    TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
811        );
812        query_plan_compare(query, expected).await;
813    }
814
815    #[tokio::test]
816    async fn range_linear_on_integer() {
817        let query = r#"SELECT min(CAST(field_0 AS Int64) + CAST(field_1 AS Int64)) RANGE '5m' FILL LINEAR FROM test ALIGN '1h' by (tag_0,tag_1);"#;
818        let expected = String::from(
819            "RangeSelect: range_exprs=[min(arrow_cast(test.field_0,Utf8(\"Int64\")) + arrow_cast(test.field_1,Utf8(\"Int64\"))) RANGE 5m FILL LINEAR], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [min(arrow_cast(test.field_0,Utf8(\"Int64\")) + arrow_cast(test.field_1,Utf8(\"Int64\"))) RANGE 5m FILL LINEAR:Float64;N]\
820            \n  TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
821        );
822        query_plan_compare(query, expected).await;
823    }
824
825    #[tokio::test]
826    async fn range_nest_range_err() {
827        let query = r#"SELECT sum(avg(field_0 + field_1) RANGE '5m' + 1) RANGE '5m' + 1 FROM test ALIGN '1h' by (tag_0,tag_1);"#;
828        assert_eq!(
829            do_query(query).await.unwrap_err().to_string(),
830            "Range Query: Nest Range Query is not allowed"
831        )
832    }
833
834    #[tokio::test]
835    async fn range_argument_err_1() {
838        let query = r#"SELECT range_fn('5m', avg(field_0), 'NULL', '1', tag_0, '1h') FROM test group by tag_0;"#;
839        let error = do_query(query).await.unwrap_err().to_string();
840        assert_eq!(
841            error,
842            "Error during planning: Illegal argument `Utf8(\"5m\")` in range select query"
843        )
844    }
845
846    #[tokio::test]
847    async fn range_argument_err_2() {
848        let query = r#"SELECT range_fn(avg(field_0), 5, 'NULL', '1', tag_0, '1h') FROM test group by tag_0;"#;
849        let error = do_query(query).await.unwrap_err().to_string();
850        assert_eq!(
851            error,
852            "Error during planning: Illegal argument `Int64(5)` in range select query"
853        )
854    }
855
856    #[test]
857    fn test_parse_duration_expr() {
858        let interval = IntervalYearMonth::new(10);
860        let args = vec![ScalarValue::IntervalYearMonth(Some(interval.to_i32())).lit()];
861        assert!(parse_duration_expr(&args, 0).is_err(),);
862        let interval = IntervalDayTime::new(10, 10);
864        let args = vec![ScalarValue::IntervalDayTime(Some(interval.into())).lit()];
865        assert_eq!(
866            parse_duration_expr(&args, 0).unwrap().as_millis() as i64,
867            interval.as_millis()
868        );
869        let interval = IntervalMonthDayNano::new(0, 10, 10);
871        let args = vec![ScalarValue::IntervalMonthDayNano(Some(interval.into())).lit()];
872        assert_eq!(
873            parse_duration_expr(&args, 0).unwrap().as_millis() as i64,
874            interval.days as i64 * MS_PER_DAY + interval.nanoseconds / NANOS_PER_MILLI,
875        );
876        let args = vec!["1y4w".lit()];
878        assert_eq!(
879            parse_duration_expr(&args, 0).unwrap(),
880            parse_duration("1y4w").unwrap()
881        );
882        let args = vec![Expr::Cast(Cast {
884            expr: Box::new("15 minutes".lit()),
885            data_type: DataType::Interval(IntervalUnit::MonthDayNano),
886        })];
887        assert_eq!(
888            parse_duration_expr(&args, 0).unwrap(),
889            parse_duration("15m").unwrap()
890        );
891        assert!(parse_duration_expr(&args, 10).is_err());
893        let args = vec![Expr::BinaryExpr(BinaryExpr {
895            left: Box::new(
896                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
897            ),
898            op: Operator::Plus,
899            right: Box::new(
900                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
901            ),
902        })];
903        assert_eq!(
904            parse_duration_expr(&args, 0).unwrap(),
905            Duration::from_millis(20)
906        );
907        let args = vec![Expr::BinaryExpr(BinaryExpr {
908            left: Box::new(
909                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
910            ),
911            op: Operator::Minus,
912            right: Box::new(
913                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
914            ),
915        })];
916        assert!(parse_duration_expr(&args, 0).is_err());
918        let args = vec![Expr::BinaryExpr(BinaryExpr {
920            left: Box::new(
921                ScalarValue::IntervalYearMonth(Some(IntervalYearMonth::new(10).to_i32())).lit(),
922            ),
923            op: Operator::Minus,
924            right: Box::new(ScalarValue::Time64Microsecond(Some(0)).lit()),
925        })];
926        assert!(parse_duration_expr(&args, 0).is_err());
927    }
928
929    #[test]
930    fn test_parse_align_to() {
931        let args = vec!["NOW".lit()];
933        let epsinon = parse_align_to(&args, 0, None).unwrap() - Timestamp::current_millis().value();
934        assert!(epsinon.abs() < 100);
935        let args = vec!["".lit()];
937        assert_eq!(0, parse_align_to(&args, 0, None).unwrap());
938        let args = vec!["".lit()];
940        assert_eq!(
941            -36000 * 1000,
942            parse_align_to(&args, 0, Some(&Timezone::from_tz_string("HST").unwrap())).unwrap()
943        );
944        assert_eq!(
945            28800 * 1000,
946            parse_align_to(
947                &args,
948                0,
949                Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
950            )
951            .unwrap()
952        );
953
954        let args = vec!["1970-01-01T00:00:00+08:00".lit()];
956        assert_eq!(parse_align_to(&args, 0, None).unwrap(), -8 * 60 * 60 * 1000);
957        let args = vec!["1970-01-01T00:00:00".lit()];
959        assert_eq!(
960            parse_align_to(
961                &args,
962                0,
963                Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
964            )
965            .unwrap(),
966            -8 * 60 * 60 * 1000
967        );
968        let args = vec![Expr::BinaryExpr(BinaryExpr {
970            left: Box::new(
971                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
972            ),
973            op: Operator::Plus,
974            right: Box::new(
975                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
976            ),
977        })];
978        assert_eq!(parse_align_to(&args, 0, None).unwrap(), 20);
979    }
980
981    #[test]
982    fn test_interval_only() {
983        let expr = Expr::BinaryExpr(BinaryExpr {
984            left: Box::new(ScalarValue::DurationMillisecond(Some(20)).lit()),
985            op: Operator::Minus,
986            right: Box::new(
987                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
988            ),
989        });
990        assert!(!interval_only_in_expr(&expr));
991        let expr = Expr::BinaryExpr(BinaryExpr {
992            left: Box::new(
993                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
994            ),
995            op: Operator::Minus,
996            right: Box::new(
997                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
998            ),
999        });
1000        assert!(interval_only_in_expr(&expr));
1001
1002        let expr = Expr::BinaryExpr(BinaryExpr {
1003            left: Box::new(Expr::Cast(Cast {
1004                expr: Box::new("15 minute".lit()),
1005                data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1006            })),
1007            op: Operator::Minus,
1008            right: Box::new(
1009                ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
1010            ),
1011        });
1012        assert!(interval_only_in_expr(&expr));
1013
1014        let expr = Expr::Cast(Cast {
1015            expr: Box::new(Expr::BinaryExpr(BinaryExpr {
1016                left: Box::new(Expr::Cast(Cast {
1017                    expr: Box::new("15 minute".lit()),
1018                    data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1019                })),
1020                op: Operator::Minus,
1021                right: Box::new(
1022                    ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
1023                ),
1024            })),
1025            data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1026        });
1027
1028        assert!(interval_only_in_expr(&expr));
1029    }
1030}