1use std::collections::BTreeSet;
16use std::sync::Arc;
17use std::time::Duration;
18
19use arrow_schema::DataType;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use chrono::Utc;
23use common_time::interval::{MS_PER_DAY, NANOS_PER_MILLI};
24use common_time::timestamp::TimeUnit;
25use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp, Timezone};
26use datafusion::datasource::DefaultTableSource;
27use datafusion::prelude::Column;
28use datafusion::scalar::ScalarValue;
29use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
30use datafusion_common::{DFSchema, DataFusionError, Result as DFResult};
31use datafusion_expr::execution_props::ExecutionProps;
32use datafusion_expr::expr::WildcardOptions;
33use datafusion_expr::simplify::SimplifyContext;
34use datafusion_expr::{
35 Aggregate, Analyze, Cast, Distinct, DistinctOn, Explain, Expr, ExprSchemable, Extension,
36 LogicalPlan, LogicalPlanBuilder, Projection,
37};
38use datafusion_optimizer::simplify_expressions::ExprSimplifier;
39use datatypes::prelude::ConcreteDataType;
40use promql_parser::util::parse_duration;
41use session::context::QueryContextRef;
42use snafu::{ensure, OptionExt, ResultExt};
43use table::table::adapter::DfTableProviderAdapter;
44
45use crate::error::{
46 CatalogSnafu, RangeQuerySnafu, Result, TimeIndexNotFoundSnafu, UnknownTableSnafu,
47};
48use crate::plan::ExtractExpr;
49use crate::range_select::plan::{Fill, RangeFn, RangeSelect};
50
51pub struct RangeExprRewriter<'a> {
55 input_plan: &'a Arc<LogicalPlan>,
56 align: Duration,
57 align_to: i64,
58 by: Vec<Expr>,
59 range_fn: BTreeSet<RangeFn>,
61 sub_aggr: &'a Aggregate,
62 query_ctx: &'a QueryContextRef,
63}
64
65impl RangeExprRewriter<'_> {
66 pub fn get_range_expr(&self, args: &[Expr], i: usize) -> DFResult<Expr> {
67 match args.get(i) {
68 Some(Expr::Column(column)) => {
69 let index = self.sub_aggr.schema.index_of_column(column)?;
70 let len = self.sub_aggr.group_expr.len();
71 self.sub_aggr
72 .aggr_expr
73 .get(index - len)
74 .cloned()
75 .ok_or(DataFusionError::Plan(
76 "Range expr not found in underlying Aggregate Plan".into(),
77 ))
78 }
79 other => Err(dispose_parse_error(other)),
80 }
81 }
82}
83
84#[inline]
85fn dispose_parse_error(expr: Option<&Expr>) -> DataFusionError {
86 DataFusionError::Plan(
87 expr.map(|x| {
88 format!(
89 "Illegal argument `{}` in range select query",
90 x.schema_name()
91 )
92 })
93 .unwrap_or("Missing argument in range select query".into()),
94 )
95}
96
97fn parse_str_expr(args: &[Expr], i: usize) -> DFResult<&str> {
98 match args.get(i) {
99 Some(Expr::Literal(ScalarValue::Utf8(Some(str)))) => Ok(str.as_str()),
100 other => Err(dispose_parse_error(other)),
101 }
102}
103
104fn parse_expr_to_string(args: &[Expr], i: usize) -> DFResult<String> {
105 match args.get(i) {
106 Some(Expr::Literal(ScalarValue::Utf8(Some(str)))) => Ok(str.to_string()),
107 Some(expr) => Ok(expr.schema_name().to_string()),
108 None => Err(dispose_parse_error(None)),
109 }
110}
111
112fn parse_duration_expr(args: &[Expr], i: usize) -> DFResult<Duration> {
117 match args.get(i) {
118 Some(Expr::Literal(ScalarValue::Utf8(Some(str)))) => {
119 parse_duration(str).map_err(DataFusionError::Plan)
120 }
121 Some(expr) => {
122 let ms = evaluate_expr_to_millisecond(args, i, true)?;
123 if ms <= 0 {
124 return Err(dispose_parse_error(Some(expr)));
125 }
126 Ok(Duration::from_millis(ms as u64))
127 }
128 None => Err(dispose_parse_error(None)),
129 }
130}
131
132fn evaluate_expr_to_millisecond(args: &[Expr], i: usize, interval_only: bool) -> DFResult<i64> {
140 let Some(expr) = args.get(i) else {
141 return Err(dispose_parse_error(None));
142 };
143 if interval_only && !interval_only_in_expr(expr) {
144 return Err(dispose_parse_error(Some(expr)));
145 }
146 let execution_props = ExecutionProps::new().with_query_execution_start_time(Utc::now());
147 let info = SimplifyContext::new(&execution_props).with_schema(Arc::new(DFSchema::empty()));
148 let simplify_expr = ExprSimplifier::new(info).simplify(expr.clone())?;
149 match simplify_expr {
150 Expr::Literal(ScalarValue::TimestampNanosecond(ts_nanos, _))
151 | Expr::Literal(ScalarValue::DurationNanosecond(ts_nanos)) => {
152 ts_nanos.map(|v| v / 1_000_000)
153 }
154 Expr::Literal(ScalarValue::TimestampMicrosecond(ts_micros, _))
155 | Expr::Literal(ScalarValue::DurationMicrosecond(ts_micros)) => {
156 ts_micros.map(|v| v / 1_000)
157 }
158 Expr::Literal(ScalarValue::TimestampMillisecond(ts_millis, _))
159 | Expr::Literal(ScalarValue::DurationMillisecond(ts_millis)) => ts_millis,
160 Expr::Literal(ScalarValue::TimestampSecond(ts_secs, _))
161 | Expr::Literal(ScalarValue::DurationSecond(ts_secs)) => ts_secs.map(|v| v * 1_000),
162 Expr::Literal(ScalarValue::IntervalYearMonth(interval)) => interval
164 .map(|v| {
165 let interval = IntervalYearMonth::from_i32(v);
166 if interval.months != 0 {
167 return Err(DataFusionError::Plan(format!(
168 "Year or month interval is not allowed in range query: {}",
169 expr.schema_name()
170 )));
171 }
172
173 Ok(0)
174 })
175 .transpose()?,
176 Expr::Literal(ScalarValue::IntervalDayTime(interval)) => interval.map(|v| {
177 let interval = IntervalDayTime::from(v);
178 interval.as_millis()
179 }),
180 Expr::Literal(ScalarValue::IntervalMonthDayNano(interval)) => interval
181 .map(|v| {
182 let interval = IntervalMonthDayNano::from(v);
183 if interval.months != 0 {
184 return Err(DataFusionError::Plan(format!(
185 "Year or month interval is not allowed in range query: {}",
186 expr.schema_name()
187 )));
188 }
189
190 Ok(interval.days as i64 * MS_PER_DAY + interval.nanoseconds / NANOS_PER_MILLI)
191 })
192 .transpose()?,
193 _ => None,
194 }
195 .ok_or_else(|| {
196 DataFusionError::Plan(format!(
197 "{} is not a expr can be evaluate and use in range query",
198 expr.schema_name()
199 ))
200 })
201}
202
203fn parse_align_to(args: &[Expr], i: usize, timezone: Option<&Timezone>) -> DFResult<i64> {
210 let Ok(s) = parse_str_expr(args, i) else {
211 return evaluate_expr_to_millisecond(args, i, false);
212 };
213 let upper = s.to_uppercase();
214 match upper.as_str() {
215 "NOW" => return Ok(Timestamp::current_millis().value()),
216 "" => return Ok(timezone.map(|tz| tz.local_minus_utc() * 1000).unwrap_or(0)),
218 _ => (),
219 }
220
221 Timestamp::from_str(s, timezone)
222 .map_err(|e| {
223 DataFusionError::Plan(format!(
224 "Illegal `align to` argument `{}` in range select query, can't be parse as NOW/CALENDAR/Timestamp, error: {}",
225 s, e
226 ))
227 })?.convert_to(TimeUnit::Millisecond).map(|x|x.value()).ok_or(DataFusionError::Plan(format!(
228 "Illegal `align to` argument `{}` in range select query, can't be convert to a valid Timestamp",
229 s
230 ))
231 )
232}
233
234fn parse_expr_list(args: &[Expr], start: usize, len: usize) -> DFResult<Vec<Expr>> {
235 let mut outs = Vec::with_capacity(len);
236 for i in start..start + len {
237 outs.push(match &args.get(i) {
238 Some(
239 Expr::Column(_) | Expr::Literal(_) | Expr::BinaryExpr(_) | Expr::ScalarFunction(_),
240 ) => args[i].clone(),
241 other => {
242 return Err(dispose_parse_error(*other));
243 }
244 });
245 }
246 Ok(outs)
247}
248
249macro_rules! inconsistent_check {
250 ($self: ident.$name: ident, $cond: expr) => {
251 if $cond && $self.$name != $name {
252 return Err(DataFusionError::Plan(
253 concat!(
254 "Inconsistent ",
255 stringify!($name),
256 " given in Range Function Rewrite"
257 )
258 .into(),
259 ));
260 } else {
261 $self.$name = $name;
262 }
263 };
264}
265
266impl TreeNodeRewriter for RangeExprRewriter<'_> {
267 type Node = Expr;
268
269 fn f_down(&mut self, node: Expr) -> DFResult<Transformed<Expr>> {
270 if let Expr::ScalarFunction(func) = &node {
271 if func.name() == "range_fn" {
272 let range_expr = self.get_range_expr(&func.args, 0)?;
275 let range = parse_duration_expr(&func.args, 1)?;
276 let byc = str::parse::<usize>(parse_str_expr(&func.args, 3)?)
277 .map_err(|e| DataFusionError::Plan(e.to_string()))?;
278 let by = parse_expr_list(&func.args, 4, byc)?;
279 let align = parse_duration_expr(&func.args, byc + 4)?;
280 let align_to =
281 parse_align_to(&func.args, byc + 5, Some(&self.query_ctx.timezone()))?;
282 let mut data_type = range_expr.get_type(self.input_plan.schema())?;
283 let mut need_cast = false;
284 let fill = Fill::try_from_str(parse_str_expr(&func.args, 2)?, &data_type)?;
285 if matches!(fill, Some(Fill::Linear)) && data_type.is_integer() {
286 data_type = DataType::Float64;
287 need_cast = true;
288 }
289 inconsistent_check!(self.by, !self.by.is_empty());
290 inconsistent_check!(self.align, self.align != Duration::default());
291 inconsistent_check!(self.align_to, self.align_to != 0);
292 let range_fn = RangeFn {
293 name: if let Some(fill) = &fill {
294 format!(
295 "{} RANGE {} FILL {}",
296 range_expr.schema_name(),
297 parse_expr_to_string(&func.args, 1)?,
298 fill
299 )
300 } else {
301 format!(
302 "{} RANGE {}",
303 range_expr.schema_name(),
304 parse_expr_to_string(&func.args, 1)?,
305 )
306 },
307 data_type,
308 expr: range_expr,
309 range,
310 fill,
311 need_cast,
312 };
313 let alias = Expr::Column(Column::from_name(range_fn.name.clone()));
314 self.range_fn.insert(range_fn);
315 return Ok(Transformed::yes(alias));
316 }
317 }
318 Ok(Transformed::no(node))
319 }
320}
321
322pub struct RangePlanRewriter {
329 table_provider: DfTableSourceProvider,
330 query_ctx: QueryContextRef,
331}
332
333impl RangePlanRewriter {
334 pub fn new(table_provider: DfTableSourceProvider, query_ctx: QueryContextRef) -> Self {
335 Self {
336 table_provider,
337 query_ctx,
338 }
339 }
340
341 pub async fn rewrite(&mut self, plan: LogicalPlan) -> Result<LogicalPlan> {
342 match self.rewrite_logical_plan(&plan).await? {
343 Some(new_plan) => Ok(new_plan),
344 None => Ok(plan),
345 }
346 }
347
348 #[async_recursion]
349 async fn rewrite_logical_plan(&mut self, plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
350 let inputs = plan.inputs();
351 let mut new_inputs = Vec::with_capacity(inputs.len());
352 for input in &inputs {
353 new_inputs.push(self.rewrite_logical_plan(input).await?)
354 }
355 match plan {
356 LogicalPlan::Projection(Projection { expr, input, .. })
357 if have_range_in_exprs(expr) =>
358 {
359 let (aggr_plan, input) = if let LogicalPlan::Aggregate(aggr) = input.as_ref() {
360 if have_range_in_exprs(&aggr.aggr_expr) {
362 return RangeQuerySnafu {
363 msg: "Nest Range Query is not allowed",
364 }
365 .fail();
366 }
367 (aggr, aggr.input.clone())
368 } else {
369 return RangeQuerySnafu {
370 msg: "Window functions is not allowed in Range Query",
371 }
372 .fail();
373 };
374 let (time_index, default_by) = self.get_index_by(input.schema()).await?;
375 let mut range_rewriter = RangeExprRewriter {
376 input_plan: &input,
377 align: Duration::default(),
378 align_to: 0,
379 by: vec![],
380 range_fn: BTreeSet::new(),
381 sub_aggr: aggr_plan,
382 query_ctx: &self.query_ctx,
383 };
384 let new_expr = expr
385 .iter()
386 .map(|expr| expr.clone().rewrite(&mut range_rewriter).map(|x| x.data))
387 .collect::<DFResult<Vec<_>>>()?;
388 if range_rewriter.by.is_empty() {
389 range_rewriter.by = default_by;
390 }
391 let range_select = RangeSelect::try_new(
392 input.clone(),
393 range_rewriter.range_fn.into_iter().collect(),
394 range_rewriter.align,
395 range_rewriter.align_to,
396 time_index,
397 range_rewriter.by,
398 &new_expr,
399 )?;
400 let no_additional_project = range_select.schema_project.is_some();
401 let range_plan = LogicalPlan::Extension(Extension {
402 node: Arc::new(range_select),
403 });
404 if no_additional_project {
405 Ok(Some(range_plan))
406 } else {
407 let project_plan = LogicalPlanBuilder::from(range_plan)
408 .project(new_expr)
409 .and_then(|x| x.build())?;
410 Ok(Some(project_plan))
411 }
412 }
413 _ => {
414 if new_inputs.iter().any(|x| x.is_some()) {
415 let inputs: Vec<LogicalPlan> = new_inputs
416 .into_iter()
417 .zip(inputs)
418 .map(|(x, y)| match x {
419 Some(plan) => plan,
420 None => y.clone(),
421 })
422 .collect();
423 let plan = match plan {
427 LogicalPlan::Analyze(Analyze { verbose, .. }) => {
428 ensure!(
429 inputs.len() == 1,
430 RangeQuerySnafu {
431 msg: "Illegal subplan nums when rewrite Analyze logical plan",
432 }
433 );
434 LogicalPlanBuilder::from(inputs[0].clone())
435 .explain(*verbose, true)?
436 .build()
437 }
438 LogicalPlan::Explain(Explain { verbose, .. }) => {
439 ensure!(
440 inputs.len() == 1,
441 RangeQuerySnafu {
442 msg: "Illegal subplan nums when rewrite Explain logical plan",
443 }
444 );
445 LogicalPlanBuilder::from(inputs[0].clone())
446 .explain(*verbose, false)?
447 .build()
448 }
449 LogicalPlan::Distinct(Distinct::On(DistinctOn {
450 on_expr,
451 select_expr,
452 sort_expr,
453 ..
454 })) => {
455 ensure!(
456 inputs.len() == 1,
457 RangeQuerySnafu {
458 msg:
459 "Illegal subplan nums when rewrite DistinctOn logical plan",
460 }
461 );
462 LogicalPlanBuilder::from(inputs[0].clone())
463 .distinct_on(
464 on_expr.clone(),
465 select_expr.clone(),
466 sort_expr.clone(),
467 )?
468 .build()
469 }
470 _ => plan.with_new_exprs(plan.expressions_consider_join(), inputs),
471 }?;
472 Ok(Some(plan))
473 } else {
474 Ok(None)
475 }
476 }
477 }
478 }
479
480 async fn get_index_by(&mut self, schema: &Arc<DFSchema>) -> Result<(Expr, Vec<Expr>)> {
485 let mut time_index_expr = Expr::Wildcard {
486 qualifier: None,
487 options: Box::new(WildcardOptions::default()),
488 };
489 let mut default_by = vec![];
490 for i in 0..schema.fields().len() {
491 let (qualifier, _) = schema.qualified_field(i);
492 if let Some(table_ref) = qualifier {
493 let table = self
494 .table_provider
495 .resolve_table(table_ref.clone())
496 .await
497 .context(CatalogSnafu)?
498 .as_any()
499 .downcast_ref::<DefaultTableSource>()
500 .context(UnknownTableSnafu)?
501 .table_provider
502 .as_any()
503 .downcast_ref::<DfTableProviderAdapter>()
504 .context(UnknownTableSnafu)?
505 .table();
506 let schema = table.schema();
507 let time_index_column =
508 schema
509 .timestamp_column()
510 .with_context(|| TimeIndexNotFoundSnafu {
511 table: table_ref.to_string(),
512 })?;
513 if let ConcreteDataType::Timestamp(_) = time_index_column.data_type {
515 default_by = table
516 .table_info()
517 .meta
518 .row_key_column_names()
519 .map(|key| Expr::Column(Column::new(Some(table_ref.clone()), key)))
520 .collect();
521 if default_by.is_empty() {
525 default_by = vec![Expr::Literal(ScalarValue::Int64(Some(1)))];
526 }
527 time_index_expr = Expr::Column(Column::new(
528 Some(table_ref.clone()),
529 time_index_column.name.clone(),
530 ));
531 }
532 }
533 }
534 if matches!(time_index_expr, Expr::Wildcard { .. }) {
535 TimeIndexNotFoundSnafu {
536 table: schema.to_string(),
537 }
538 .fail()
539 } else {
540 Ok((time_index_expr, default_by))
541 }
542 }
543}
544
545fn have_range_in_exprs(exprs: &[Expr]) -> bool {
546 exprs.iter().any(|expr| {
547 let mut find_range = false;
548 let _ = expr.apply(|expr| {
549 Ok(match expr {
550 Expr::ScalarFunction(func) if func.name() == "range_fn" => {
551 find_range = true;
552 TreeNodeRecursion::Stop
553 }
554 _ => TreeNodeRecursion::Continue,
555 })
556 });
557 find_range
558 })
559}
560
561fn interval_only_in_expr(expr: &Expr) -> bool {
562 let mut all_interval = true;
563 let _ = expr.apply(|expr| {
564 if matches!(
566 expr,
567 Expr::Cast(Cast{
568 expr,
569 data_type: DataType::Interval(_)
570 }) if matches!(&**expr, Expr::Literal(ScalarValue::Utf8(_)))
571 ) {
572 return Ok(TreeNodeRecursion::Stop);
575 }
576
577 if !matches!(
578 expr,
579 Expr::Literal(ScalarValue::IntervalDayTime(_))
580 | Expr::Literal(ScalarValue::IntervalMonthDayNano(_))
581 | Expr::Literal(ScalarValue::IntervalYearMonth(_))
582 | Expr::BinaryExpr(_)
583 | Expr::Cast(Cast {
584 data_type: DataType::Interval(_),
585 ..
586 })
587 ) {
588 all_interval = false;
589 Ok(TreeNodeRecursion::Stop)
590 } else {
591 Ok(TreeNodeRecursion::Continue)
592 }
593 });
594
595 all_interval
596}
597
598#[cfg(test)]
599mod test {
600
601 use arrow::datatypes::IntervalUnit;
602 use catalog::memory::MemoryCatalogManager;
603 use catalog::RegisterTableRequest;
604 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
605 use common_time::IntervalYearMonth;
606 use datafusion_expr::{BinaryExpr, Operator};
607 use datatypes::prelude::ConcreteDataType;
608 use datatypes::schema::{ColumnSchema, Schema};
609 use session::context::QueryContext;
610 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
611 use table::test_util::EmptyTable;
612
613 use super::*;
614 use crate::options::QueryOptions;
615 use crate::parser::QueryLanguageParser;
616 use crate::{QueryEngineFactory, QueryEngineRef};
617
618 async fn create_test_engine() -> QueryEngineRef {
619 let table_name = "test".to_string();
620 let mut columns = vec![];
621 for i in 0..5 {
622 columns.push(ColumnSchema::new(
623 format!("tag_{i}"),
624 ConcreteDataType::string_datatype(),
625 false,
626 ));
627 }
628 columns.push(
629 ColumnSchema::new(
630 "timestamp".to_string(),
631 ConcreteDataType::timestamp_millisecond_datatype(),
632 false,
633 )
634 .with_time_index(true),
635 );
636 for i in 0..5 {
637 columns.push(ColumnSchema::new(
638 format!("field_{i}"),
639 ConcreteDataType::float64_datatype(),
640 true,
641 ));
642 }
643 let schema = Arc::new(Schema::new(columns));
644 let table_meta = TableMetaBuilder::empty()
645 .schema(schema)
646 .primary_key_indices((0..5).collect())
647 .value_indices((6..11).collect())
648 .next_column_id(1024)
649 .build()
650 .unwrap();
651 let table_info = TableInfoBuilder::default()
652 .name(&table_name)
653 .meta(table_meta)
654 .build()
655 .unwrap();
656 let table = EmptyTable::from_table_info(&table_info);
657 let catalog_list = MemoryCatalogManager::with_default_setup();
658 assert!(catalog_list
659 .register_table_sync(RegisterTableRequest {
660 catalog: DEFAULT_CATALOG_NAME.to_string(),
661 schema: DEFAULT_SCHEMA_NAME.to_string(),
662 table_name,
663 table_id: 1024,
664 table,
665 })
666 .is_ok());
667 QueryEngineFactory::new(
668 catalog_list,
669 None,
670 None,
671 None,
672 None,
673 false,
674 QueryOptions::default(),
675 )
676 .query_engine()
677 }
678
679 async fn do_query(sql: &str) -> Result<LogicalPlan> {
680 let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
681 let engine = create_test_engine().await;
682 engine.planner().plan(&stmt, QueryContext::arc()).await
683 }
684
685 async fn query_plan_compare(sql: &str, expected: String) {
686 let plan = do_query(sql).await.unwrap();
687 assert_eq!(plan.display_indent_schema().to_string(), expected);
688 }
689
690 #[tokio::test]
691 async fn range_no_project() {
692 let query = r#"SELECT timestamp, tag_0, tag_1, avg(field_0 + field_1) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
693 let expected = String::from(
694 "RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, avg(test.field_0 + test.field_1) RANGE 5m:Float64;N]\
695 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
696 );
697 query_plan_compare(query, expected).await;
698 }
699
700 #[tokio::test]
701 async fn range_expr_calculation() {
702 let query = r#"SELECT (avg(field_0 + field_1)/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
703 let expected = String::from(
704 "Projection: avg(test.field_0 + test.field_1) RANGE 5m / Int64(4) [avg(test.field_0 + test.field_1) RANGE 5m / Int64(4):Float64;N]\
705 \n RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
706 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
707 );
708 query_plan_compare(query, expected).await;
709 }
710
711 #[tokio::test]
712 async fn range_multi_args() {
713 let query =
714 r#"SELECT (covar(field_0 + field_1, field_1)/4) RANGE '5m' FROM test ALIGN '1h';"#;
715 let expected = String::from(
716 "Projection: covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4) [covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4):Float64;N]\
717 \n RangeSelect: range_exprs=[covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1, test.tag_2, test.tag_3, test.tag_4], time_index=timestamp [covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8]\
718 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
719 );
720 query_plan_compare(query, expected).await;
721 }
722
723 #[tokio::test]
724 async fn range_calculation() {
725 let query = r#"SELECT ((avg(field_0)+sum(field_1))/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL;"#;
726 let expected = String::from(
727 "Projection: (avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL) / Int64(4) [avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL / Int64(4):Float64;N]\
728 \n RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL NULL, sum(test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL NULL:Float64;N, sum(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
729 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
730 );
731 query_plan_compare(query, expected).await;
732 }
733
734 #[tokio::test]
735 async fn range_as_sub_query() {
736 let query = r#"SELECT foo + 1 from (SELECT ((avg(field_0)+sum(field_1))/4) RANGE '5m' as foo FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL) where foo > 1;"#;
737 let expected = String::from(
738 "Projection: foo + Int64(1) [foo + Int64(1):Float64;N]\
739 \n Filter: foo > Int64(1) [foo:Float64;N]\
740 \n Projection: (avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL) / Int64(4) AS foo [foo:Float64;N]\
741 \n RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL NULL, sum(test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL NULL:Float64;N, sum(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
742 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
743 );
744 query_plan_compare(query, expected).await;
745 }
746
747 #[tokio::test]
748 async fn range_from_nest_query() {
749 let query = r#"SELECT ((avg(a)+sum(b))/4) RANGE '5m' FROM (SELECT field_0 as a, field_1 as b, tag_0 as c, tag_1 as d, timestamp from test where field_0 > 1.0) ALIGN '1h' by (c, d) FILL NULL;"#;
750 let expected = String::from(
751 "Projection: (avg(a) RANGE 5m FILL NULL + sum(b) RANGE 5m FILL NULL) / Int64(4) [avg(a) RANGE 5m FILL NULL + sum(b) RANGE 5m FILL NULL / Int64(4):Float64;N]\
752 \n RangeSelect: range_exprs=[avg(a) RANGE 5m FILL NULL, sum(b) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[c, d], time_index=timestamp [avg(a) RANGE 5m FILL NULL:Float64;N, sum(b) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), c:Utf8, d:Utf8]\
753 \n Projection: test.field_0 AS a, test.field_1 AS b, test.tag_0 AS c, test.tag_1 AS d, test.timestamp [a:Float64;N, b:Float64;N, c:Utf8, d:Utf8, timestamp:Timestamp(Millisecond, None)]\
754 \n Filter: test.field_0 > Float64(1) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]\
755 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
756 );
757 query_plan_compare(query, expected).await;
758 }
759
760 #[tokio::test]
761 async fn range_in_expr() {
762 let query = r#"SELECT sin(avg(field_0 + field_1) RANGE '5m' + 1) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
763 let expected = String::from(
764 "Projection: sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1)) [sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1)):Float64;N]\
765 \n RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
766 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
767 );
768 query_plan_compare(query, expected).await;
769 }
770
771 #[tokio::test]
772 async fn duplicate_range_expr() {
773 let query = r#"SELECT avg(field_0) RANGE '5m' FILL 6.0 + avg(field_0) RANGE '5m' FILL 6.0 FROM test ALIGN '1h' by (tag_0,tag_1);"#;
774 let expected = String::from(
775 "Projection: avg(test.field_0) RANGE 5m FILL 6 + avg(test.field_0) RANGE 5m FILL 6 [avg(test.field_0) RANGE 5m FILL 6 + avg(test.field_0) RANGE 5m FILL 6:Float64]\
776 \n RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL 6], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL 6:Float64, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
777 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
778 );
779 query_plan_compare(query, expected).await;
780 }
781
782 #[tokio::test]
783 async fn deep_nest_range_expr() {
784 let query = r#"SELECT round(sin(avg(field_0 + field_1) RANGE '5m' + 1)) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
785 let expected = String::from(
786 "Projection: round(sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1))) [round(sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1))):Float64;N]\
787 \n RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
788 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
789 );
790 query_plan_compare(query, expected).await;
791 }
792
793 #[tokio::test]
794 async fn complex_range_expr() {
795 let query = r#"SELECT gcd(CAST(max(field_0 + 1) Range '5m' FILL NULL AS Int64), CAST(tag_0 AS Int64)) + round(max(field_2+1) Range '6m' FILL NULL + 1) + max(field_2+3) Range '10m' FILL NULL * CAST(tag_1 AS Float64) + 1 FROM test ALIGN '1h' by (tag_0, tag_1);"#;
796 let expected = String::from(
797 "Projection: gcd(arrow_cast(max(test.field_0 + Int64(1)) RANGE 5m FILL NULL, Utf8(\"Int64\")), arrow_cast(test.tag_0, Utf8(\"Int64\"))) + round(max(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + max(test.field_2 + Int64(3)) RANGE 10m FILL NULL * arrow_cast(test.tag_1, Utf8(\"Float64\")) + Int64(1) [gcd(arrow_cast(max(test.field_0 + Int64(1)) RANGE 5m FILL NULL,Utf8(\"Int64\")),arrow_cast(test.tag_0,Utf8(\"Int64\"))) + round(max(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + max(test.field_2 + Int64(3)) RANGE 10m FILL NULL * arrow_cast(test.tag_1,Utf8(\"Float64\")) + Int64(1):Float64;N]\
798 \n RangeSelect: range_exprs=[max(test.field_0 + Int64(1)) RANGE 5m FILL NULL, max(test.field_2 + Int64(1)) RANGE 6m FILL NULL, max(test.field_2 + Int64(3)) RANGE 10m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [max(test.field_0 + Int64(1)) RANGE 5m FILL NULL:Float64;N, max(test.field_2 + Int64(1)) RANGE 6m FILL NULL:Float64;N, max(test.field_2 + Int64(3)) RANGE 10m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
799 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
800 );
801 query_plan_compare(query, expected).await;
802 }
803
804 #[tokio::test]
805 async fn range_linear_on_integer() {
806 let query = r#"SELECT min(CAST(field_0 AS Int64) + CAST(field_1 AS Int64)) RANGE '5m' FILL LINEAR FROM test ALIGN '1h' by (tag_0,tag_1);"#;
807 let expected = String::from(
808 "RangeSelect: range_exprs=[min(arrow_cast(test.field_0,Utf8(\"Int64\")) + arrow_cast(test.field_1,Utf8(\"Int64\"))) RANGE 5m FILL LINEAR], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [min(arrow_cast(test.field_0,Utf8(\"Int64\")) + arrow_cast(test.field_1,Utf8(\"Int64\"))) RANGE 5m FILL LINEAR:Float64;N]\
809 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
810 );
811 query_plan_compare(query, expected).await;
812 }
813
814 #[tokio::test]
815 async fn range_nest_range_err() {
816 let query = r#"SELECT sum(avg(field_0 + field_1) RANGE '5m' + 1) RANGE '5m' + 1 FROM test ALIGN '1h' by (tag_0,tag_1);"#;
817 assert_eq!(
818 do_query(query).await.unwrap_err().to_string(),
819 "Range Query: Nest Range Query is not allowed"
820 )
821 }
822
823 #[tokio::test]
824 async fn range_argument_err_1() {
827 let query = r#"SELECT range_fn('5m', avg(field_0), 'NULL', '1', tag_0, '1h') FROM test group by tag_0;"#;
828 let error = do_query(query).await.unwrap_err().to_string();
829 assert_eq!(
830 error,
831 "Error during planning: Illegal argument `Utf8(\"5m\")` in range select query"
832 )
833 }
834
835 #[tokio::test]
836 async fn range_argument_err_2() {
837 let query = r#"SELECT range_fn(avg(field_0), 5, 'NULL', '1', tag_0, '1h') FROM test group by tag_0;"#;
838 let error = do_query(query).await.unwrap_err().to_string();
839 assert_eq!(
840 error,
841 "Error during planning: Illegal argument `Int64(5)` in range select query"
842 )
843 }
844
845 #[test]
846 fn test_parse_duration_expr() {
847 let interval = IntervalYearMonth::new(10);
849 let args = vec![Expr::Literal(ScalarValue::IntervalYearMonth(Some(
850 interval.to_i32(),
851 )))];
852 assert!(parse_duration_expr(&args, 0).is_err(),);
853 let interval = IntervalDayTime::new(10, 10);
855 let args = vec![Expr::Literal(ScalarValue::IntervalDayTime(Some(
856 interval.into(),
857 )))];
858 assert_eq!(
859 parse_duration_expr(&args, 0).unwrap().as_millis() as i64,
860 interval.as_millis()
861 );
862 let interval = IntervalMonthDayNano::new(0, 10, 10);
864 let args = vec![Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(
865 interval.into(),
866 )))];
867 assert_eq!(
868 parse_duration_expr(&args, 0).unwrap().as_millis() as i64,
869 interval.days as i64 * MS_PER_DAY + interval.nanoseconds / NANOS_PER_MILLI,
870 );
871 let args = vec![Expr::Literal(ScalarValue::Utf8(Some("1y4w".into())))];
873 assert_eq!(
874 parse_duration_expr(&args, 0).unwrap(),
875 parse_duration("1y4w").unwrap()
876 );
877 let args = vec![Expr::Cast(Cast {
879 expr: Box::new(Expr::Literal(ScalarValue::Utf8(Some("15 minutes".into())))),
880 data_type: DataType::Interval(IntervalUnit::MonthDayNano),
881 })];
882 assert_eq!(
883 parse_duration_expr(&args, 0).unwrap(),
884 parse_duration("15m").unwrap()
885 );
886 assert!(parse_duration_expr(&args, 10).is_err());
888 let args = vec![Expr::BinaryExpr(BinaryExpr {
890 left: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
891 IntervalDayTime::new(0, 10).into(),
892 )))),
893 op: Operator::Plus,
894 right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
895 IntervalDayTime::new(0, 10).into(),
896 )))),
897 })];
898 assert_eq!(
899 parse_duration_expr(&args, 0).unwrap(),
900 Duration::from_millis(20)
901 );
902 let args = vec![Expr::BinaryExpr(BinaryExpr {
903 left: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
904 IntervalDayTime::new(0, 10).into(),
905 )))),
906 op: Operator::Minus,
907 right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
908 IntervalDayTime::new(0, 10).into(),
909 )))),
910 })];
911 assert!(parse_duration_expr(&args, 0).is_err());
913 let args = vec![Expr::BinaryExpr(BinaryExpr {
915 left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
916 IntervalYearMonth::new(10).to_i32(),
917 )))),
918 op: Operator::Minus,
919 right: Box::new(Expr::Literal(ScalarValue::Time64Microsecond(Some(0)))),
920 })];
921 assert!(parse_duration_expr(&args, 0).is_err());
922 }
923
924 #[test]
925 fn test_parse_align_to() {
926 let args = vec![Expr::Literal(ScalarValue::Utf8(Some("NOW".into())))];
928 let epsinon = parse_align_to(&args, 0, None).unwrap() - Timestamp::current_millis().value();
929 assert!(epsinon.abs() < 100);
930 let args = vec![Expr::Literal(ScalarValue::Utf8(Some("".into())))];
932 assert_eq!(0, parse_align_to(&args, 0, None).unwrap());
933 let args = vec![Expr::Literal(ScalarValue::Utf8(Some("".into())))];
935 assert_eq!(
936 -36000 * 1000,
937 parse_align_to(&args, 0, Some(&Timezone::from_tz_string("HST").unwrap())).unwrap()
938 );
939 assert_eq!(
940 28800 * 1000,
941 parse_align_to(
942 &args,
943 0,
944 Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
945 )
946 .unwrap()
947 );
948
949 let args = vec![Expr::Literal(ScalarValue::Utf8(Some(
951 "1970-01-01T00:00:00+08:00".into(),
952 )))];
953 assert_eq!(parse_align_to(&args, 0, None).unwrap(), -8 * 60 * 60 * 1000);
954 let args = vec![Expr::Literal(ScalarValue::Utf8(Some(
956 "1970-01-01T00:00:00".into(),
957 )))];
958 assert_eq!(
959 parse_align_to(
960 &args,
961 0,
962 Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
963 )
964 .unwrap(),
965 -8 * 60 * 60 * 1000
966 );
967 let args = vec![Expr::BinaryExpr(BinaryExpr {
969 left: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
970 IntervalDayTime::new(0, 10).into(),
971 )))),
972 op: Operator::Plus,
973 right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
974 IntervalDayTime::new(0, 10).into(),
975 )))),
976 })];
977 assert_eq!(parse_align_to(&args, 0, None).unwrap(), 20);
978 }
979
980 #[test]
981 fn test_interval_only() {
982 let expr = Expr::BinaryExpr(BinaryExpr {
983 left: Box::new(Expr::Literal(ScalarValue::DurationMillisecond(Some(20)))),
984 op: Operator::Minus,
985 right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
986 IntervalDayTime::new(10, 0).into(),
987 )))),
988 });
989 assert!(!interval_only_in_expr(&expr));
990 let expr = Expr::BinaryExpr(BinaryExpr {
991 left: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
992 IntervalDayTime::new(10, 0).into(),
993 )))),
994 op: Operator::Minus,
995 right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
996 IntervalDayTime::new(10, 0).into(),
997 )))),
998 });
999 assert!(interval_only_in_expr(&expr));
1000
1001 let expr = Expr::BinaryExpr(BinaryExpr {
1002 left: Box::new(Expr::Cast(Cast {
1003 expr: Box::new(Expr::Literal(ScalarValue::Utf8(Some(
1004 "15 minute".to_string(),
1005 )))),
1006 data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1007 })),
1008 op: Operator::Minus,
1009 right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
1010 IntervalDayTime::new(10, 0).into(),
1011 )))),
1012 });
1013 assert!(interval_only_in_expr(&expr));
1014
1015 let expr = Expr::Cast(Cast {
1016 expr: Box::new(Expr::BinaryExpr(BinaryExpr {
1017 left: Box::new(Expr::Cast(Cast {
1018 expr: Box::new(Expr::Literal(ScalarValue::Utf8(Some(
1019 "15 minute".to_string(),
1020 )))),
1021 data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1022 })),
1023 op: Operator::Minus,
1024 right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
1025 IntervalDayTime::new(10, 0).into(),
1026 )))),
1027 })),
1028 data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1029 });
1030
1031 assert!(interval_only_in_expr(&expr));
1032 }
1033}