1use std::collections::BTreeSet;
16use std::sync::Arc;
17use std::time::Duration;
18
19use arrow_schema::DataType;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use chrono::Utc;
23use common_time::interval::{MS_PER_DAY, NANOS_PER_MILLI};
24use common_time::timestamp::TimeUnit;
25use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp, Timezone};
26use datafusion::datasource::DefaultTableSource;
27use datafusion::prelude::Column;
28use datafusion::scalar::ScalarValue;
29use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
30use datafusion_common::{DFSchema, DataFusionError, Result as DFResult};
31use datafusion_expr::execution_props::ExecutionProps;
32use datafusion_expr::expr::WildcardOptions;
33use datafusion_expr::simplify::SimplifyContext;
34use datafusion_expr::{
35 Aggregate, Analyze, Cast, Distinct, DistinctOn, Explain, Expr, ExprSchemable, Extension,
36 Literal, LogicalPlan, LogicalPlanBuilder, Projection,
37};
38use datafusion_optimizer::simplify_expressions::ExprSimplifier;
39use datatypes::prelude::ConcreteDataType;
40use promql_parser::util::parse_duration;
41use session::context::QueryContextRef;
42use snafu::{OptionExt, ResultExt, ensure};
43use table::table::adapter::DfTableProviderAdapter;
44
45use crate::error::{
46 CatalogSnafu, RangeQuerySnafu, Result, TimeIndexNotFoundSnafu, UnknownTableSnafu,
47};
48use crate::plan::ExtractExpr;
49use crate::range_select::plan::{Fill, RangeFn, RangeSelect};
50
51pub struct RangeExprRewriter<'a> {
55 input_plan: &'a Arc<LogicalPlan>,
56 align: Duration,
57 align_to: i64,
58 by: Vec<Expr>,
59 range_fn: BTreeSet<RangeFn>,
61 sub_aggr: &'a Aggregate,
62 query_ctx: &'a QueryContextRef,
63}
64
65impl RangeExprRewriter<'_> {
66 pub fn get_range_expr(&self, args: &[Expr], i: usize) -> DFResult<Expr> {
67 match args.get(i) {
68 Some(Expr::Column(column)) => {
69 let index = self.sub_aggr.schema.index_of_column(column)?;
70 let len = self.sub_aggr.group_expr.len();
71 self.sub_aggr
72 .aggr_expr
73 .get(index - len)
74 .cloned()
75 .ok_or(DataFusionError::Plan(
76 "Range expr not found in underlying Aggregate Plan".into(),
77 ))
78 }
79 Some(Expr::Alias(alias)) => {
80 self.get_range_expr(std::slice::from_ref(alias.expr.as_ref()), 0)
81 }
82 other => Err(dispose_parse_error(other)),
83 }
84 }
85}
86
87#[inline]
88fn dispose_parse_error(expr: Option<&Expr>) -> DataFusionError {
89 DataFusionError::Plan(
90 expr.map(|x| {
91 format!(
92 "Illegal argument `{}` in range select query",
93 x.schema_name()
94 )
95 })
96 .unwrap_or("Missing argument in range select query".into()),
97 )
98}
99
100fn parse_str_expr(args: &[Expr], i: usize) -> DFResult<&str> {
101 match args.get(i) {
102 Some(Expr::Literal(ScalarValue::Utf8(Some(str)), _)) => Ok(str.as_str()),
103 other => Err(dispose_parse_error(other)),
104 }
105}
106
107fn parse_expr_to_string(args: &[Expr], i: usize) -> DFResult<String> {
108 match args.get(i) {
109 Some(Expr::Literal(ScalarValue::Utf8(Some(str)), _)) => Ok(str.clone()),
110 Some(expr) => Ok(expr.schema_name().to_string()),
111 None => Err(dispose_parse_error(None)),
112 }
113}
114
115fn parse_duration_expr(args: &[Expr], i: usize) -> DFResult<Duration> {
120 match args.get(i) {
121 Some(Expr::Literal(ScalarValue::Utf8(Some(str)), _)) => {
122 parse_duration(str).map_err(DataFusionError::Plan)
123 }
124 Some(expr) => {
125 let ms = evaluate_expr_to_millisecond(args, i, true)?;
126 if ms <= 0 {
127 return Err(dispose_parse_error(Some(expr)));
128 }
129 Ok(Duration::from_millis(ms as u64))
130 }
131 None => Err(dispose_parse_error(None)),
132 }
133}
134
135fn evaluate_expr_to_millisecond(args: &[Expr], i: usize, interval_only: bool) -> DFResult<i64> {
143 let Some(expr) = args.get(i) else {
144 return Err(dispose_parse_error(None));
145 };
146 if interval_only && !interval_only_in_expr(expr) {
147 return Err(dispose_parse_error(Some(expr)));
148 }
149 let execution_props = ExecutionProps::new().with_query_execution_start_time(Utc::now());
150 let info = SimplifyContext::new(&execution_props).with_schema(Arc::new(DFSchema::empty()));
151 let simplify_expr = ExprSimplifier::new(info).simplify(expr.clone())?;
152 match simplify_expr {
153 Expr::Literal(ScalarValue::TimestampNanosecond(ts_nanos, _), _)
154 | Expr::Literal(ScalarValue::DurationNanosecond(ts_nanos), _) => {
155 ts_nanos.map(|v| v / 1_000_000)
156 }
157 Expr::Literal(ScalarValue::TimestampMicrosecond(ts_micros, _), _)
158 | Expr::Literal(ScalarValue::DurationMicrosecond(ts_micros), _) => {
159 ts_micros.map(|v| v / 1_000)
160 }
161 Expr::Literal(ScalarValue::TimestampMillisecond(ts_millis, _), _)
162 | Expr::Literal(ScalarValue::DurationMillisecond(ts_millis), _) => ts_millis,
163 Expr::Literal(ScalarValue::TimestampSecond(ts_secs, _), _)
164 | Expr::Literal(ScalarValue::DurationSecond(ts_secs), _) => ts_secs.map(|v| v * 1_000),
165 Expr::Literal(ScalarValue::IntervalYearMonth(interval), _) => interval
167 .map(|v| {
168 let interval = IntervalYearMonth::from_i32(v);
169 if interval.months != 0 {
170 return Err(DataFusionError::Plan(format!(
171 "Year or month interval is not allowed in range query: {}",
172 expr.schema_name()
173 )));
174 }
175
176 Ok(0)
177 })
178 .transpose()?,
179 Expr::Literal(ScalarValue::IntervalDayTime(interval), _) => interval.map(|v| {
180 let interval = IntervalDayTime::from(v);
181 interval.as_millis()
182 }),
183 Expr::Literal(ScalarValue::IntervalMonthDayNano(interval), _) => interval
184 .map(|v| {
185 let interval = IntervalMonthDayNano::from(v);
186 if interval.months != 0 {
187 return Err(DataFusionError::Plan(format!(
188 "Year or month interval is not allowed in range query: {}",
189 expr.schema_name()
190 )));
191 }
192
193 Ok(interval.days as i64 * MS_PER_DAY + interval.nanoseconds / NANOS_PER_MILLI)
194 })
195 .transpose()?,
196 _ => None,
197 }
198 .ok_or_else(|| {
199 DataFusionError::Plan(format!(
200 "{} is not a expr can be evaluate and use in range query",
201 expr.schema_name()
202 ))
203 })
204}
205
206fn parse_align_to(args: &[Expr], i: usize, timezone: Option<&Timezone>) -> DFResult<i64> {
213 let Ok(s) = parse_str_expr(args, i) else {
214 return evaluate_expr_to_millisecond(args, i, false);
215 };
216 let upper = s.to_uppercase();
217 match upper.as_str() {
218 "NOW" => return Ok(Timestamp::current_millis().value()),
219 "" => return Ok(timezone.map(|tz| tz.local_minus_utc() * 1000).unwrap_or(0)),
221 _ => (),
222 }
223
224 Timestamp::from_str(s, timezone)
225 .map_err(|e| {
226 DataFusionError::Plan(format!(
227 "Illegal `align to` argument `{}` in range select query, can't be parse as NOW/CALENDAR/Timestamp, error: {}",
228 s, e
229 ))
230 })?.convert_to(TimeUnit::Millisecond).map(|x|x.value()).ok_or(DataFusionError::Plan(format!(
231 "Illegal `align to` argument `{}` in range select query, can't be convert to a valid Timestamp",
232 s
233 ))
234 )
235}
236
237fn parse_expr_list(args: &[Expr], start: usize, len: usize) -> DFResult<Vec<Expr>> {
238 let mut outs = Vec::with_capacity(len);
239 for i in start..start + len {
240 outs.push(match &args.get(i) {
241 Some(
242 Expr::Column(_)
243 | Expr::Literal(_, _)
244 | Expr::BinaryExpr(_)
245 | Expr::ScalarFunction(_),
246 ) => args[i].clone(),
247 Some(Expr::Alias(alias)) if matches!(*alias.expr, Expr::ScalarFunction(_)) => {
248 args[i].clone()
249 }
250 other => {
251 return Err(dispose_parse_error(*other));
252 }
253 });
254 }
255 Ok(outs)
256}
257
258macro_rules! inconsistent_check {
259 ($self: ident.$name: ident, $cond: expr) => {
260 if $cond && $self.$name != $name {
261 return Err(DataFusionError::Plan(
262 concat!(
263 "Inconsistent ",
264 stringify!($name),
265 " given in Range Function Rewrite"
266 )
267 .into(),
268 ));
269 } else {
270 $self.$name = $name;
271 }
272 };
273}
274
275impl TreeNodeRewriter for RangeExprRewriter<'_> {
276 type Node = Expr;
277
278 fn f_down(&mut self, node: Expr) -> DFResult<Transformed<Expr>> {
279 if let Expr::ScalarFunction(func) = &node
280 && func.name() == "range_fn"
281 {
282 let range_expr = self.get_range_expr(&func.args, 0)?;
285 let range = parse_duration_expr(&func.args, 1)?;
286 let byc = str::parse::<usize>(parse_str_expr(&func.args, 3)?)
287 .map_err(|e| DataFusionError::Plan(e.to_string()))?;
288 let by = parse_expr_list(&func.args, 4, byc)?;
289 let align = parse_duration_expr(&func.args, byc + 4)?;
290 let align_to = parse_align_to(&func.args, byc + 5, Some(&self.query_ctx.timezone()))?;
291 let mut data_type = range_expr.get_type(self.input_plan.schema())?;
292 let mut need_cast = false;
293 let fill = Fill::try_from_str(parse_str_expr(&func.args, 2)?, &data_type)?;
294 if matches!(fill, Some(Fill::Linear)) && data_type.is_integer() {
295 data_type = DataType::Float64;
296 need_cast = true;
297 }
298 inconsistent_check!(self.by, !self.by.is_empty());
299 inconsistent_check!(self.align, self.align != Duration::default());
300 inconsistent_check!(self.align_to, self.align_to != 0);
301 let range_fn = RangeFn {
302 name: if let Some(fill) = &fill {
303 format!(
304 "{} RANGE {} FILL {}",
305 range_expr.schema_name(),
306 parse_expr_to_string(&func.args, 1)?,
307 fill
308 )
309 } else {
310 format!(
311 "{} RANGE {}",
312 range_expr.schema_name(),
313 parse_expr_to_string(&func.args, 1)?,
314 )
315 },
316 data_type,
317 expr: range_expr,
318 range,
319 fill,
320 need_cast,
321 };
322 let alias = Expr::Column(Column::from_name(range_fn.name.clone()));
323 self.range_fn.insert(range_fn);
324 return Ok(Transformed::yes(alias));
325 }
326 Ok(Transformed::no(node))
327 }
328}
329
330pub struct RangePlanRewriter {
337 table_provider: DfTableSourceProvider,
338 query_ctx: QueryContextRef,
339}
340
341impl RangePlanRewriter {
342 pub fn new(table_provider: DfTableSourceProvider, query_ctx: QueryContextRef) -> Self {
343 Self {
344 table_provider,
345 query_ctx,
346 }
347 }
348
349 pub async fn rewrite(&mut self, plan: LogicalPlan) -> Result<LogicalPlan> {
350 match self.rewrite_logical_plan(&plan).await? {
351 Some(new_plan) => Ok(new_plan),
352 None => Ok(plan),
353 }
354 }
355
356 #[async_recursion]
357 async fn rewrite_logical_plan(&mut self, plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
358 let inputs = plan.inputs();
359 let mut new_inputs = Vec::with_capacity(inputs.len());
360 for input in &inputs {
361 new_inputs.push(self.rewrite_logical_plan(input).await?)
362 }
363 match plan {
364 LogicalPlan::Projection(Projection { expr, input, .. })
365 if have_range_in_exprs(expr) =>
366 {
367 let (aggr_plan, input) = if let LogicalPlan::Aggregate(aggr) = input.as_ref() {
368 if have_range_in_exprs(&aggr.aggr_expr) {
370 return RangeQuerySnafu {
371 msg: "Nest Range Query is not allowed",
372 }
373 .fail();
374 }
375 (aggr, aggr.input.clone())
376 } else {
377 return RangeQuerySnafu {
378 msg: "Window functions is not allowed in Range Query",
379 }
380 .fail();
381 };
382 let (time_index, default_by) = self.get_index_by(input.schema()).await?;
383 let mut range_rewriter = RangeExprRewriter {
384 input_plan: &input,
385 align: Duration::default(),
386 align_to: 0,
387 by: vec![],
388 range_fn: BTreeSet::new(),
389 sub_aggr: aggr_plan,
390 query_ctx: &self.query_ctx,
391 };
392 let new_expr = expr
393 .iter()
394 .map(|expr| expr.clone().rewrite(&mut range_rewriter).map(|x| x.data))
395 .collect::<DFResult<Vec<_>>>()?;
396 if range_rewriter.by.is_empty() {
397 range_rewriter.by = default_by;
398 }
399 let range_select = RangeSelect::try_new(
400 input.clone(),
401 range_rewriter.range_fn.into_iter().collect(),
402 range_rewriter.align,
403 range_rewriter.align_to,
404 time_index,
405 range_rewriter.by,
406 &new_expr,
407 )?;
408 let no_additional_project = range_select.schema_project.is_some();
409 let range_plan = LogicalPlan::Extension(Extension {
410 node: Arc::new(range_select),
411 });
412 if no_additional_project {
413 Ok(Some(range_plan))
414 } else {
415 let project_plan = LogicalPlanBuilder::from(range_plan)
416 .project(new_expr)
417 .and_then(|x| x.build())?;
418 Ok(Some(project_plan))
419 }
420 }
421 _ => {
422 if new_inputs.iter().any(|x| x.is_some()) {
423 let inputs: Vec<LogicalPlan> = new_inputs
424 .into_iter()
425 .zip(inputs)
426 .map(|(x, y)| match x {
427 Some(plan) => plan,
428 None => y.clone(),
429 })
430 .collect();
431 let plan = match plan {
435 LogicalPlan::Analyze(Analyze { verbose, .. }) => {
436 ensure!(
437 inputs.len() == 1,
438 RangeQuerySnafu {
439 msg: "Illegal subplan nums when rewrite Analyze logical plan",
440 }
441 );
442 LogicalPlanBuilder::from(inputs[0].clone())
443 .explain(*verbose, true)?
444 .build()
445 }
446 LogicalPlan::Explain(Explain { verbose, .. }) => {
447 ensure!(
448 inputs.len() == 1,
449 RangeQuerySnafu {
450 msg: "Illegal subplan nums when rewrite Explain logical plan",
451 }
452 );
453 LogicalPlanBuilder::from(inputs[0].clone())
454 .explain(*verbose, false)?
455 .build()
456 }
457 LogicalPlan::Distinct(Distinct::On(DistinctOn {
458 on_expr,
459 select_expr,
460 sort_expr,
461 ..
462 })) => {
463 ensure!(
464 inputs.len() == 1,
465 RangeQuerySnafu {
466 msg: "Illegal subplan nums when rewrite DistinctOn logical plan",
467 }
468 );
469 LogicalPlanBuilder::from(inputs[0].clone())
470 .distinct_on(
471 on_expr.clone(),
472 select_expr.clone(),
473 sort_expr.clone(),
474 )?
475 .build()
476 }
477 _ => plan.with_new_exprs(plan.expressions_consider_join(), inputs),
478 }?;
479 Ok(Some(plan))
480 } else {
481 Ok(None)
482 }
483 }
484 }
485 }
486
487 async fn get_index_by(&mut self, schema: &Arc<DFSchema>) -> Result<(Expr, Vec<Expr>)> {
492 #[allow(deprecated)]
493 let mut time_index_expr = Expr::Wildcard {
494 qualifier: None,
495 options: Box::new(WildcardOptions::default()),
496 };
497 let mut default_by = vec![];
498 for i in 0..schema.fields().len() {
499 let (qualifier, _) = schema.qualified_field(i);
500 if let Some(table_ref) = qualifier {
501 let table = self
502 .table_provider
503 .resolve_table(table_ref.clone())
504 .await
505 .context(CatalogSnafu)?
506 .as_any()
507 .downcast_ref::<DefaultTableSource>()
508 .context(UnknownTableSnafu)?
509 .table_provider
510 .as_any()
511 .downcast_ref::<DfTableProviderAdapter>()
512 .context(UnknownTableSnafu)?
513 .table();
514 let schema = table.schema();
515 let time_index_column =
516 schema
517 .timestamp_column()
518 .with_context(|| TimeIndexNotFoundSnafu {
519 table: table_ref.to_string(),
520 })?;
521 if let ConcreteDataType::Timestamp(_) = time_index_column.data_type {
523 default_by = table
524 .table_info()
525 .meta
526 .row_key_column_names()
527 .map(|key| Expr::Column(Column::new(Some(table_ref.clone()), key)))
528 .collect();
529 if default_by.is_empty() {
533 default_by = vec![1.lit()];
534 }
535 time_index_expr = Expr::Column(Column::new(
536 Some(table_ref.clone()),
537 time_index_column.name.clone(),
538 ));
539 }
540 }
541 }
542 #[allow(deprecated)]
543 if matches!(time_index_expr, Expr::Wildcard { .. }) {
544 TimeIndexNotFoundSnafu {
545 table: schema.to_string(),
546 }
547 .fail()
548 } else {
549 Ok((time_index_expr, default_by))
550 }
551 }
552}
553
554fn have_range_in_exprs(exprs: &[Expr]) -> bool {
555 exprs.iter().any(|expr| {
556 let mut find_range = false;
557 let _ = expr.apply(|expr| {
558 Ok(match expr {
559 Expr::ScalarFunction(func) if func.name() == "range_fn" => {
560 find_range = true;
561 TreeNodeRecursion::Stop
562 }
563 _ => TreeNodeRecursion::Continue,
564 })
565 });
566 find_range
567 })
568}
569
570fn interval_only_in_expr(expr: &Expr) -> bool {
571 let mut all_interval = true;
572 let _ = expr.apply(|expr| {
573 if matches!(
575 expr,
576 Expr::Cast(Cast{
577 expr,
578 data_type: DataType::Interval(_)
579 }) if matches!(&**expr, Expr::Literal(ScalarValue::Utf8(_), _))
580 ) {
581 return Ok(TreeNodeRecursion::Stop);
584 }
585
586 if !matches!(
587 expr,
588 Expr::Literal(ScalarValue::IntervalDayTime(_), _)
589 | Expr::Literal(ScalarValue::IntervalMonthDayNano(_), _)
590 | Expr::Literal(ScalarValue::IntervalYearMonth(_), _)
591 | Expr::BinaryExpr(_)
592 | Expr::Cast(Cast {
593 data_type: DataType::Interval(_),
594 ..
595 })
596 ) {
597 all_interval = false;
598 Ok(TreeNodeRecursion::Stop)
599 } else {
600 Ok(TreeNodeRecursion::Continue)
601 }
602 });
603
604 all_interval
605}
606
607#[cfg(test)]
608mod test {
609
610 use arrow::datatypes::IntervalUnit;
611 use catalog::RegisterTableRequest;
612 use catalog::memory::MemoryCatalogManager;
613 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
614 use common_time::IntervalYearMonth;
615 use datafusion_expr::{BinaryExpr, Literal, Operator};
616 use datatypes::prelude::ConcreteDataType;
617 use datatypes::schema::{ColumnSchema, Schema};
618 use session::context::QueryContext;
619 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
620 use table::test_util::EmptyTable;
621
622 use super::*;
623 use crate::options::QueryOptions;
624 use crate::parser::QueryLanguageParser;
625 use crate::{QueryEngineFactory, QueryEngineRef};
626
627 async fn create_test_engine() -> QueryEngineRef {
628 let table_name = "test".to_string();
629 let mut columns = vec![];
630 for i in 0..5 {
631 columns.push(ColumnSchema::new(
632 format!("tag_{i}"),
633 ConcreteDataType::string_datatype(),
634 false,
635 ));
636 }
637 columns.push(
638 ColumnSchema::new(
639 "timestamp".to_string(),
640 ConcreteDataType::timestamp_millisecond_datatype(),
641 false,
642 )
643 .with_time_index(true),
644 );
645 for i in 0..5 {
646 columns.push(ColumnSchema::new(
647 format!("field_{i}"),
648 ConcreteDataType::float64_datatype(),
649 true,
650 ));
651 }
652 let schema = Arc::new(Schema::new(columns));
653 let table_meta = TableMetaBuilder::empty()
654 .schema(schema)
655 .primary_key_indices((0..5).collect())
656 .value_indices((6..11).collect())
657 .next_column_id(1024)
658 .build()
659 .unwrap();
660 let table_info = TableInfoBuilder::default()
661 .name(&table_name)
662 .meta(table_meta)
663 .build()
664 .unwrap();
665 let table = EmptyTable::from_table_info(&table_info);
666 let catalog_list = MemoryCatalogManager::with_default_setup();
667 assert!(
668 catalog_list
669 .register_table_sync(RegisterTableRequest {
670 catalog: DEFAULT_CATALOG_NAME.to_string(),
671 schema: DEFAULT_SCHEMA_NAME.to_string(),
672 table_name,
673 table_id: 1024,
674 table,
675 })
676 .is_ok()
677 );
678 QueryEngineFactory::new(
679 catalog_list,
680 None,
681 None,
682 None,
683 None,
684 false,
685 QueryOptions::default(),
686 )
687 .query_engine()
688 }
689
690 async fn do_query(sql: &str) -> Result<LogicalPlan> {
691 let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
692 let engine = create_test_engine().await;
693 engine.planner().plan(&stmt, QueryContext::arc()).await
694 }
695
696 async fn query_plan_compare(sql: &str, expected: String) {
697 let plan = do_query(sql).await.unwrap();
698 assert_eq!(plan.display_indent_schema().to_string(), expected);
699 }
700
701 #[tokio::test]
702 async fn range_no_project() {
703 let query = r#"SELECT timestamp, tag_0, tag_1, avg(field_0 + field_1) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
704 let expected = String::from(
705 "RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, avg(test.field_0 + test.field_1) RANGE 5m:Float64;N]\
706 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
707 );
708 query_plan_compare(query, expected).await;
709 }
710
711 #[tokio::test]
712 async fn range_expr_calculation() {
713 let query = r#"SELECT (avg(field_0 + field_1)/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
714 let expected = String::from(
715 "Projection: avg(test.field_0 + test.field_1) RANGE 5m / Int64(4) [avg(test.field_0 + test.field_1) RANGE 5m / Int64(4):Float64;N]\
716 \n RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
717 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
718 );
719 query_plan_compare(query, expected).await;
720 }
721
722 #[tokio::test]
723 async fn range_multi_args() {
724 let query =
725 r#"SELECT (covar(field_0 + field_1, field_1)/4) RANGE '5m' FROM test ALIGN '1h';"#;
726 let expected = String::from(
727 "Projection: covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4) [covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4):Float64;N]\
728 \n RangeSelect: range_exprs=[covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1, test.tag_2, test.tag_3, test.tag_4], time_index=timestamp [covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8]\
729 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
730 );
731 query_plan_compare(query, expected).await;
732 }
733
734 #[tokio::test]
735 async fn range_calculation() {
736 let query = r#"SELECT ((avg(field_0)+sum(field_1))/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL;"#;
737 let expected = String::from(
738 "Projection: (avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL) / Int64(4) [avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL / Int64(4):Float64;N]\
739 \n RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL NULL, sum(test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL NULL:Float64;N, sum(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
740 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
741 );
742 query_plan_compare(query, expected).await;
743 }
744
745 #[tokio::test]
746 async fn range_as_sub_query() {
747 let query = r#"SELECT foo + 1 from (SELECT ((avg(field_0)+sum(field_1))/4) RANGE '5m' as foo FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL) where foo > 1;"#;
748 let expected = String::from(
749 "Projection: foo + Int64(1) [foo + Int64(1):Float64;N]\
750 \n Filter: foo > Int64(1) [foo:Float64;N]\
751 \n Projection: (avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL) / Int64(4) AS foo [foo:Float64;N]\
752 \n RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL NULL, sum(test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL NULL:Float64;N, sum(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
753 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
754 );
755 query_plan_compare(query, expected).await;
756 }
757
758 #[tokio::test]
759 async fn range_from_nest_query() {
760 let query = r#"SELECT ((avg(a)+sum(b))/4) RANGE '5m' FROM (SELECT field_0 as a, field_1 as b, tag_0 as c, tag_1 as d, timestamp from test where field_0 > 1.0) ALIGN '1h' by (c, d) FILL NULL;"#;
761 let expected = String::from(
762 "Projection: (avg(a) RANGE 5m FILL NULL + sum(b) RANGE 5m FILL NULL) / Int64(4) [avg(a) RANGE 5m FILL NULL + sum(b) RANGE 5m FILL NULL / Int64(4):Float64;N]\
763 \n RangeSelect: range_exprs=[avg(a) RANGE 5m FILL NULL, sum(b) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[c, d], time_index=timestamp [avg(a) RANGE 5m FILL NULL:Float64;N, sum(b) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), c:Utf8, d:Utf8]\
764 \n Projection: test.field_0 AS a, test.field_1 AS b, test.tag_0 AS c, test.tag_1 AS d, test.timestamp [a:Float64;N, b:Float64;N, c:Utf8, d:Utf8, timestamp:Timestamp(Millisecond, None)]\
765 \n Filter: test.field_0 > Float64(1) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]\
766 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
767 );
768 query_plan_compare(query, expected).await;
769 }
770
771 #[tokio::test]
772 async fn range_in_expr() {
773 let query = r#"SELECT sin(avg(field_0 + field_1) RANGE '5m' + 1) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
774 let expected = String::from(
775 "Projection: sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1)) [sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1)):Float64;N]\
776 \n RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
777 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
778 );
779 query_plan_compare(query, expected).await;
780 }
781
782 #[tokio::test]
783 async fn duplicate_range_expr() {
784 let query = r#"SELECT avg(field_0) RANGE '5m' FILL 6.0 + avg(field_0) RANGE '5m' FILL 6.0 FROM test ALIGN '1h' by (tag_0,tag_1);"#;
785 let expected = String::from(
786 "Projection: avg(test.field_0) RANGE 5m FILL 6 + avg(test.field_0) RANGE 5m FILL 6 [avg(test.field_0) RANGE 5m FILL 6 + avg(test.field_0) RANGE 5m FILL 6:Float64]\
787 \n RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL 6], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL 6:Float64, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
788 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
789 );
790 query_plan_compare(query, expected).await;
791 }
792
793 #[tokio::test]
794 async fn deep_nest_range_expr() {
795 let query = r#"SELECT round(sin(avg(field_0 + field_1) RANGE '5m' + 1)) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
796 let expected = String::from(
797 "Projection: round(sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1))) [round(sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1))):Float64;N]\
798 \n RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
799 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
800 );
801 query_plan_compare(query, expected).await;
802 }
803
804 #[tokio::test]
805 async fn complex_range_expr() {
806 let query = r#"SELECT gcd(CAST(max(field_0 + 1) Range '5m' FILL NULL AS Int64), CAST(tag_0 AS Int64)) + round(max(field_2+1) Range '6m' FILL NULL + 1) + max(field_2+3) Range '10m' FILL NULL * CAST(tag_1 AS Float64) + 1 FROM test ALIGN '1h' by (tag_0, tag_1);"#;
807 let expected = String::from(
808 "Projection: gcd(arrow_cast(max(test.field_0 + Int64(1)) RANGE 5m FILL NULL, Utf8(\"Int64\")), arrow_cast(test.tag_0, Utf8(\"Int64\"))) + round(max(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + max(test.field_2 + Int64(3)) RANGE 10m FILL NULL * arrow_cast(test.tag_1, Utf8(\"Float64\")) + Int64(1) [gcd(arrow_cast(max(test.field_0 + Int64(1)) RANGE 5m FILL NULL,Utf8(\"Int64\")),arrow_cast(test.tag_0,Utf8(\"Int64\"))) + round(max(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + max(test.field_2 + Int64(3)) RANGE 10m FILL NULL * arrow_cast(test.tag_1,Utf8(\"Float64\")) + Int64(1):Float64;N]\
809 \n RangeSelect: range_exprs=[max(test.field_0 + Int64(1)) RANGE 5m FILL NULL, max(test.field_2 + Int64(1)) RANGE 6m FILL NULL, max(test.field_2 + Int64(3)) RANGE 10m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [max(test.field_0 + Int64(1)) RANGE 5m FILL NULL:Float64;N, max(test.field_2 + Int64(1)) RANGE 6m FILL NULL:Float64;N, max(test.field_2 + Int64(3)) RANGE 10m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
810 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
811 );
812 query_plan_compare(query, expected).await;
813 }
814
815 #[tokio::test]
816 async fn range_linear_on_integer() {
817 let query = r#"SELECT min(CAST(field_0 AS Int64) + CAST(field_1 AS Int64)) RANGE '5m' FILL LINEAR FROM test ALIGN '1h' by (tag_0,tag_1);"#;
818 let expected = String::from(
819 "RangeSelect: range_exprs=[min(arrow_cast(test.field_0,Utf8(\"Int64\")) + arrow_cast(test.field_1,Utf8(\"Int64\"))) RANGE 5m FILL LINEAR], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [min(arrow_cast(test.field_0,Utf8(\"Int64\")) + arrow_cast(test.field_1,Utf8(\"Int64\"))) RANGE 5m FILL LINEAR:Float64;N]\
820 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
821 );
822 query_plan_compare(query, expected).await;
823 }
824
825 #[tokio::test]
826 async fn range_nest_range_err() {
827 let query = r#"SELECT sum(avg(field_0 + field_1) RANGE '5m' + 1) RANGE '5m' + 1 FROM test ALIGN '1h' by (tag_0,tag_1);"#;
828 assert_eq!(
829 do_query(query).await.unwrap_err().to_string(),
830 "Range Query: Nest Range Query is not allowed"
831 )
832 }
833
834 #[tokio::test]
835 async fn range_argument_err_1() {
838 let query = r#"SELECT range_fn('5m', avg(field_0), 'NULL', '1', tag_0, '1h') FROM test group by tag_0;"#;
839 let error = do_query(query).await.unwrap_err().to_string();
840 assert_eq!(
841 error,
842 "Error during planning: Illegal argument `Utf8(\"5m\")` in range select query"
843 )
844 }
845
846 #[tokio::test]
847 async fn range_argument_err_2() {
848 let query = r#"SELECT range_fn(avg(field_0), 5, 'NULL', '1', tag_0, '1h') FROM test group by tag_0;"#;
849 let error = do_query(query).await.unwrap_err().to_string();
850 assert_eq!(
851 error,
852 "Error during planning: Illegal argument `Int64(5)` in range select query"
853 )
854 }
855
856 #[test]
857 fn test_parse_duration_expr() {
858 let interval = IntervalYearMonth::new(10);
860 let args = vec![ScalarValue::IntervalYearMonth(Some(interval.to_i32())).lit()];
861 assert!(parse_duration_expr(&args, 0).is_err(),);
862 let interval = IntervalDayTime::new(10, 10);
864 let args = vec![ScalarValue::IntervalDayTime(Some(interval.into())).lit()];
865 assert_eq!(
866 parse_duration_expr(&args, 0).unwrap().as_millis() as i64,
867 interval.as_millis()
868 );
869 let interval = IntervalMonthDayNano::new(0, 10, 10);
871 let args = vec![ScalarValue::IntervalMonthDayNano(Some(interval.into())).lit()];
872 assert_eq!(
873 parse_duration_expr(&args, 0).unwrap().as_millis() as i64,
874 interval.days as i64 * MS_PER_DAY + interval.nanoseconds / NANOS_PER_MILLI,
875 );
876 let args = vec!["1y4w".lit()];
878 assert_eq!(
879 parse_duration_expr(&args, 0).unwrap(),
880 parse_duration("1y4w").unwrap()
881 );
882 let args = vec![Expr::Cast(Cast {
884 expr: Box::new("15 minutes".lit()),
885 data_type: DataType::Interval(IntervalUnit::MonthDayNano),
886 })];
887 assert_eq!(
888 parse_duration_expr(&args, 0).unwrap(),
889 parse_duration("15m").unwrap()
890 );
891 assert!(parse_duration_expr(&args, 10).is_err());
893 let args = vec![Expr::BinaryExpr(BinaryExpr {
895 left: Box::new(
896 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
897 ),
898 op: Operator::Plus,
899 right: Box::new(
900 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
901 ),
902 })];
903 assert_eq!(
904 parse_duration_expr(&args, 0).unwrap(),
905 Duration::from_millis(20)
906 );
907 let args = vec![Expr::BinaryExpr(BinaryExpr {
908 left: Box::new(
909 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
910 ),
911 op: Operator::Minus,
912 right: Box::new(
913 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
914 ),
915 })];
916 assert!(parse_duration_expr(&args, 0).is_err());
918 let args = vec![Expr::BinaryExpr(BinaryExpr {
920 left: Box::new(
921 ScalarValue::IntervalYearMonth(Some(IntervalYearMonth::new(10).to_i32())).lit(),
922 ),
923 op: Operator::Minus,
924 right: Box::new(ScalarValue::Time64Microsecond(Some(0)).lit()),
925 })];
926 assert!(parse_duration_expr(&args, 0).is_err());
927 }
928
929 #[test]
930 fn test_parse_align_to() {
931 let args = vec!["NOW".lit()];
933 let epsinon = parse_align_to(&args, 0, None).unwrap() - Timestamp::current_millis().value();
934 assert!(epsinon.abs() < 100);
935 let args = vec!["".lit()];
937 assert_eq!(0, parse_align_to(&args, 0, None).unwrap());
938 let args = vec!["".lit()];
940 assert_eq!(
941 -36000 * 1000,
942 parse_align_to(&args, 0, Some(&Timezone::from_tz_string("HST").unwrap())).unwrap()
943 );
944 assert_eq!(
945 28800 * 1000,
946 parse_align_to(
947 &args,
948 0,
949 Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
950 )
951 .unwrap()
952 );
953
954 let args = vec!["1970-01-01T00:00:00+08:00".lit()];
956 assert_eq!(parse_align_to(&args, 0, None).unwrap(), -8 * 60 * 60 * 1000);
957 let args = vec!["1970-01-01T00:00:00".lit()];
959 assert_eq!(
960 parse_align_to(
961 &args,
962 0,
963 Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
964 )
965 .unwrap(),
966 -8 * 60 * 60 * 1000
967 );
968 let args = vec![Expr::BinaryExpr(BinaryExpr {
970 left: Box::new(
971 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
972 ),
973 op: Operator::Plus,
974 right: Box::new(
975 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
976 ),
977 })];
978 assert_eq!(parse_align_to(&args, 0, None).unwrap(), 20);
979 }
980
981 #[test]
982 fn test_interval_only() {
983 let expr = Expr::BinaryExpr(BinaryExpr {
984 left: Box::new(ScalarValue::DurationMillisecond(Some(20)).lit()),
985 op: Operator::Minus,
986 right: Box::new(
987 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
988 ),
989 });
990 assert!(!interval_only_in_expr(&expr));
991 let expr = Expr::BinaryExpr(BinaryExpr {
992 left: Box::new(
993 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
994 ),
995 op: Operator::Minus,
996 right: Box::new(
997 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
998 ),
999 });
1000 assert!(interval_only_in_expr(&expr));
1001
1002 let expr = Expr::BinaryExpr(BinaryExpr {
1003 left: Box::new(Expr::Cast(Cast {
1004 expr: Box::new("15 minute".lit()),
1005 data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1006 })),
1007 op: Operator::Minus,
1008 right: Box::new(
1009 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
1010 ),
1011 });
1012 assert!(interval_only_in_expr(&expr));
1013
1014 let expr = Expr::Cast(Cast {
1015 expr: Box::new(Expr::BinaryExpr(BinaryExpr {
1016 left: Box::new(Expr::Cast(Cast {
1017 expr: Box::new("15 minute".lit()),
1018 data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1019 })),
1020 op: Operator::Minus,
1021 right: Box::new(
1022 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
1023 ),
1024 })),
1025 data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1026 });
1027
1028 assert!(interval_only_in_expr(&expr));
1029 }
1030}