1use std::collections::BTreeSet;
16use std::sync::Arc;
17use std::time::Duration;
18
19use arrow_schema::DataType;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_time::interval::{MS_PER_DAY, NANOS_PER_MILLI};
23use common_time::timestamp::TimeUnit;
24use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp, Timezone};
25use datafusion::datasource::DefaultTableSource;
26use datafusion::prelude::Column;
27use datafusion::scalar::ScalarValue;
28use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
29use datafusion_common::{DFSchema, DataFusionError, Result as DFResult};
30use datafusion_expr::expr::WildcardOptions;
31use datafusion_expr::simplify::SimplifyContext;
32use datafusion_expr::{
33 Aggregate, Analyze, Cast, Distinct, DistinctOn, Explain, Expr, ExprSchemable, Extension,
34 Literal, LogicalPlan, LogicalPlanBuilder, Projection,
35};
36use datafusion_optimizer::simplify_expressions::ExprSimplifier;
37use datatypes::prelude::ConcreteDataType;
38use datatypes::schema::TIME_INDEX_KEY;
39use promql_parser::util::parse_duration;
40use session::context::QueryContextRef;
41use snafu::{OptionExt, ResultExt, ensure};
42use table::table::adapter::DfTableProviderAdapter;
43
44use crate::error::{
45 CatalogSnafu, RangeQuerySnafu, Result, TimeIndexNotFoundSnafu, UnknownTableSnafu,
46};
47use crate::plan::ExtractExpr;
48use crate::range_select::plan::{Fill, RangeFn, RangeSelect};
49
50pub struct RangeExprRewriter<'a> {
54 input_plan: &'a Arc<LogicalPlan>,
55 align: Duration,
56 align_to: i64,
57 by: Vec<Expr>,
58 range_fn: BTreeSet<RangeFn>,
60 sub_aggr: &'a Aggregate,
61 query_ctx: &'a QueryContextRef,
62}
63
64impl RangeExprRewriter<'_> {
65 pub fn get_range_expr(&self, args: &[Expr], i: usize) -> DFResult<Expr> {
66 match args.get(i) {
67 Some(Expr::Column(column)) => {
68 let index = self.sub_aggr.schema.index_of_column(column)?;
69 let len = self.sub_aggr.group_expr.len();
70 self.sub_aggr
71 .aggr_expr
72 .get(index - len)
73 .cloned()
74 .ok_or(DataFusionError::Plan(
75 "Range expr not found in underlying Aggregate Plan".into(),
76 ))
77 }
78 Some(Expr::Alias(alias)) => {
79 self.get_range_expr(std::slice::from_ref(alias.expr.as_ref()), 0)
80 }
81 other => Err(dispose_parse_error(other)),
82 }
83 }
84}
85
86#[inline]
87fn dispose_parse_error(expr: Option<&Expr>) -> DataFusionError {
88 DataFusionError::Plan(
89 expr.map(|x| {
90 format!(
91 "Illegal argument `{}` in range select query",
92 x.schema_name()
93 )
94 })
95 .unwrap_or("Missing argument in range select query".into()),
96 )
97}
98
99fn parse_str_expr(args: &[Expr], i: usize) -> DFResult<&str> {
100 match args.get(i) {
101 Some(Expr::Literal(ScalarValue::Utf8(Some(str)), _)) => Ok(str.as_str()),
102 other => Err(dispose_parse_error(other)),
103 }
104}
105
106fn parse_expr_to_string(args: &[Expr], i: usize) -> DFResult<String> {
107 match args.get(i) {
108 Some(Expr::Literal(ScalarValue::Utf8(Some(str)), _)) => Ok(str.clone()),
109 Some(expr) => Ok(expr.schema_name().to_string()),
110 None => Err(dispose_parse_error(None)),
111 }
112}
113
114fn parse_duration_expr(args: &[Expr], i: usize) -> DFResult<Duration> {
119 match args.get(i) {
120 Some(Expr::Literal(ScalarValue::Utf8(Some(str)), _)) => {
121 parse_duration(str).map_err(DataFusionError::Plan)
122 }
123 Some(expr) => {
124 let ms = evaluate_expr_to_millisecond(args, i, true)?;
125 if ms <= 0 {
126 return Err(dispose_parse_error(Some(expr)));
127 }
128 Ok(Duration::from_millis(ms as u64))
129 }
130 None => Err(dispose_parse_error(None)),
131 }
132}
133
134fn evaluate_expr_to_millisecond(args: &[Expr], i: usize, interval_only: bool) -> DFResult<i64> {
142 let Some(expr) = args.get(i) else {
143 return Err(dispose_parse_error(None));
144 };
145 if interval_only && !interval_only_in_expr(expr) {
146 return Err(dispose_parse_error(Some(expr)));
147 }
148 let info = SimplifyContext::default().with_current_time();
149 let simplify_expr = ExprSimplifier::new(info).simplify(expr.clone())?;
150 match simplify_expr {
151 Expr::Literal(ScalarValue::TimestampNanosecond(ts_nanos, _), _)
152 | Expr::Literal(ScalarValue::DurationNanosecond(ts_nanos), _) => {
153 ts_nanos.map(|v| v / 1_000_000)
154 }
155 Expr::Literal(ScalarValue::TimestampMicrosecond(ts_micros, _), _)
156 | Expr::Literal(ScalarValue::DurationMicrosecond(ts_micros), _) => {
157 ts_micros.map(|v| v / 1_000)
158 }
159 Expr::Literal(ScalarValue::TimestampMillisecond(ts_millis, _), _)
160 | Expr::Literal(ScalarValue::DurationMillisecond(ts_millis), _) => ts_millis,
161 Expr::Literal(ScalarValue::TimestampSecond(ts_secs, _), _)
162 | Expr::Literal(ScalarValue::DurationSecond(ts_secs), _) => ts_secs.map(|v| v * 1_000),
163 Expr::Literal(ScalarValue::IntervalYearMonth(interval), _) => interval
165 .map(|v| {
166 let interval = IntervalYearMonth::from_i32(v);
167 if interval.months != 0 {
168 return Err(DataFusionError::Plan(format!(
169 "Year or month interval is not allowed in range query: {}",
170 expr.schema_name()
171 )));
172 }
173
174 Ok(0)
175 })
176 .transpose()?,
177 Expr::Literal(ScalarValue::IntervalDayTime(interval), _) => interval.map(|v| {
178 let interval = IntervalDayTime::from(v);
179 interval.as_millis()
180 }),
181 Expr::Literal(ScalarValue::IntervalMonthDayNano(interval), _) => interval
182 .map(|v| {
183 let interval = IntervalMonthDayNano::from(v);
184 if interval.months != 0 {
185 return Err(DataFusionError::Plan(format!(
186 "Year or month interval is not allowed in range query: {}",
187 expr.schema_name()
188 )));
189 }
190
191 Ok(interval.days as i64 * MS_PER_DAY + interval.nanoseconds / NANOS_PER_MILLI)
192 })
193 .transpose()?,
194 _ => None,
195 }
196 .ok_or_else(|| {
197 DataFusionError::Plan(format!(
198 "{} is not a expr can be evaluate and use in range query",
199 expr.schema_name()
200 ))
201 })
202}
203
204fn parse_align_to(args: &[Expr], i: usize, timezone: Option<&Timezone>) -> DFResult<i64> {
211 let Ok(s) = parse_str_expr(args, i) else {
212 return evaluate_expr_to_millisecond(args, i, false);
213 };
214 let upper = s.to_uppercase();
215 match upper.as_str() {
216 "NOW" => return Ok(Timestamp::current_millis().value()),
217 "" => return Ok(timezone.map(|tz| tz.local_minus_utc() * 1000).unwrap_or(0)),
219 _ => (),
220 }
221
222 Timestamp::from_str(s, timezone)
223 .map_err(|e| {
224 DataFusionError::Plan(format!(
225 "Illegal `align to` argument `{}` in range select query, can't be parse as NOW/CALENDAR/Timestamp, error: {}",
226 s, e
227 ))
228 })?.convert_to(TimeUnit::Millisecond).map(|x|x.value()).ok_or(DataFusionError::Plan(format!(
229 "Illegal `align to` argument `{}` in range select query, can't be convert to a valid Timestamp",
230 s
231 ))
232 )
233}
234
235fn parse_expr_list(args: &[Expr], start: usize, len: usize) -> DFResult<Vec<Expr>> {
236 let mut outs = Vec::with_capacity(len);
237 for i in start..start + len {
238 outs.push(match &args.get(i) {
239 Some(
240 Expr::Column(_)
241 | Expr::Literal(_, _)
242 | Expr::BinaryExpr(_)
243 | Expr::ScalarFunction(_),
244 ) => args[i].clone(),
245 Some(Expr::Alias(alias)) if matches!(*alias.expr, Expr::ScalarFunction(_)) => {
246 args[i].clone()
247 }
248 other => {
249 return Err(dispose_parse_error(*other));
250 }
251 });
252 }
253 Ok(outs)
254}
255
256macro_rules! inconsistent_check {
257 ($self: ident.$name: ident, $cond: expr) => {
258 if $cond && $self.$name != $name {
259 return Err(DataFusionError::Plan(
260 concat!(
261 "Inconsistent ",
262 stringify!($name),
263 " given in Range Function Rewrite"
264 )
265 .into(),
266 ));
267 } else {
268 $self.$name = $name;
269 }
270 };
271}
272
273impl TreeNodeRewriter for RangeExprRewriter<'_> {
274 type Node = Expr;
275
276 fn f_down(&mut self, node: Expr) -> DFResult<Transformed<Expr>> {
277 if let Expr::ScalarFunction(func) = &node
278 && func.name() == "range_fn"
279 {
280 let range_expr = self.get_range_expr(&func.args, 0)?;
283 let range = parse_duration_expr(&func.args, 1)?;
284 let byc = str::parse::<usize>(parse_str_expr(&func.args, 3)?)
285 .map_err(|e| DataFusionError::Plan(e.to_string()))?;
286 let by = parse_expr_list(&func.args, 4, byc)?;
287 let align = parse_duration_expr(&func.args, byc + 4)?;
288 let align_to = parse_align_to(&func.args, byc + 5, Some(&self.query_ctx.timezone()))?;
289 let mut data_type = range_expr.get_type(self.input_plan.schema())?;
290 let mut need_cast = false;
291 let fill = Fill::try_from_str(parse_str_expr(&func.args, 2)?, &data_type)?;
292 if matches!(fill, Some(Fill::Linear)) && data_type.is_integer() {
293 data_type = DataType::Float64;
294 need_cast = true;
295 }
296 inconsistent_check!(self.by, !self.by.is_empty());
297 inconsistent_check!(self.align, self.align != Duration::default());
298 inconsistent_check!(self.align_to, self.align_to != 0);
299 let range_fn = RangeFn {
300 name: if let Some(fill) = &fill {
301 format!(
302 "{} RANGE {} FILL {}",
303 range_expr.schema_name(),
304 parse_expr_to_string(&func.args, 1)?,
305 fill
306 )
307 } else {
308 format!(
309 "{} RANGE {}",
310 range_expr.schema_name(),
311 parse_expr_to_string(&func.args, 1)?,
312 )
313 },
314 data_type,
315 expr: range_expr,
316 range,
317 fill,
318 need_cast,
319 };
320 let alias = Expr::Column(Column::from_name(range_fn.name.clone()));
321 self.range_fn.insert(range_fn);
322 return Ok(Transformed::yes(alias));
323 }
324 Ok(Transformed::no(node))
325 }
326}
327
328pub struct RangePlanRewriter {
335 table_provider: DfTableSourceProvider,
336 query_ctx: QueryContextRef,
337}
338
339impl RangePlanRewriter {
340 pub fn new(table_provider: DfTableSourceProvider, query_ctx: QueryContextRef) -> Self {
341 Self {
342 table_provider,
343 query_ctx,
344 }
345 }
346
347 pub async fn rewrite(&mut self, plan: LogicalPlan) -> Result<LogicalPlan> {
348 match self.rewrite_logical_plan(&plan).await? {
349 Some(new_plan) => Ok(new_plan),
350 None => Ok(plan),
351 }
352 }
353
354 #[async_recursion]
355 async fn rewrite_logical_plan(&mut self, plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
356 let inputs = plan.inputs();
357 let mut new_inputs = Vec::with_capacity(inputs.len());
358 for input in &inputs {
359 new_inputs.push(self.rewrite_logical_plan(input).await?)
360 }
361 match plan {
362 LogicalPlan::Projection(Projection { expr, input, .. })
363 if have_range_in_exprs(expr) =>
364 {
365 let (aggr_plan, input) = if let LogicalPlan::Aggregate(aggr) = input.as_ref() {
366 if have_range_in_exprs(&aggr.aggr_expr) {
368 return RangeQuerySnafu {
369 msg: "Nest Range Query is not allowed",
370 }
371 .fail();
372 }
373 (aggr, aggr.input.clone())
374 } else {
375 return RangeQuerySnafu {
376 msg: "Window functions is not allowed in Range Query",
377 }
378 .fail();
379 };
380 let query_ctx = self.query_ctx.clone();
381 let mut range_rewriter = RangeExprRewriter {
382 input_plan: &input,
383 align: Duration::default(),
384 align_to: 0,
385 by: vec![],
386 range_fn: BTreeSet::new(),
387 sub_aggr: aggr_plan,
388 query_ctx: &query_ctx,
389 };
390 let new_expr = expr
391 .iter()
392 .map(|expr| expr.clone().rewrite(&mut range_rewriter).map(|x| x.data))
393 .collect::<DFResult<Vec<_>>>()?;
394 let need_default_by = range_rewriter.by.is_empty();
395 let (time_index, default_by) =
396 self.get_index_by(input.schema(), need_default_by).await?;
397 if need_default_by {
398 range_rewriter.by = default_by;
399 }
400 let range_select = RangeSelect::try_new(
401 input.clone(),
402 range_rewriter.range_fn.into_iter().collect(),
403 range_rewriter.align,
404 range_rewriter.align_to,
405 time_index,
406 range_rewriter.by,
407 &new_expr,
408 )?;
409 let no_additional_project = range_select.schema_project.is_some();
410 let range_plan = LogicalPlan::Extension(Extension {
411 node: Arc::new(range_select),
412 });
413 if no_additional_project {
414 Ok(Some(range_plan))
415 } else {
416 let project_plan = LogicalPlanBuilder::from(range_plan)
417 .project(new_expr)
418 .and_then(|x| x.build())?;
419 Ok(Some(project_plan))
420 }
421 }
422 _ => {
423 if new_inputs.iter().any(|x| x.is_some()) {
424 let inputs: Vec<LogicalPlan> = new_inputs
425 .into_iter()
426 .zip(inputs)
427 .map(|(x, y)| match x {
428 Some(plan) => plan,
429 None => y.clone(),
430 })
431 .collect();
432 let plan = match plan {
436 LogicalPlan::Analyze(Analyze { verbose, .. }) => {
437 ensure!(
438 inputs.len() == 1,
439 RangeQuerySnafu {
440 msg: "Illegal subplan nums when rewrite Analyze logical plan",
441 }
442 );
443 LogicalPlanBuilder::from(inputs[0].clone())
444 .explain(*verbose, true)?
445 .build()
446 }
447 LogicalPlan::Explain(Explain { verbose, .. }) => {
448 ensure!(
449 inputs.len() == 1,
450 RangeQuerySnafu {
451 msg: "Illegal subplan nums when rewrite Explain logical plan",
452 }
453 );
454 LogicalPlanBuilder::from(inputs[0].clone())
455 .explain(*verbose, false)?
456 .build()
457 }
458 LogicalPlan::Distinct(Distinct::On(DistinctOn {
459 on_expr,
460 select_expr,
461 sort_expr,
462 ..
463 })) => {
464 ensure!(
465 inputs.len() == 1,
466 RangeQuerySnafu {
467 msg: "Illegal subplan nums when rewrite DistinctOn logical plan",
468 }
469 );
470 LogicalPlanBuilder::from(inputs[0].clone())
471 .distinct_on(
472 on_expr.clone(),
473 select_expr.clone(),
474 sort_expr.clone(),
475 )?
476 .build()
477 }
478 _ => plan.with_new_exprs(plan.expressions_consider_join(), inputs),
479 }?;
480 Ok(Some(plan))
481 } else {
482 Ok(None)
483 }
484 }
485 }
486 }
487
488 async fn get_index_by(
498 &mut self,
499 schema: &Arc<DFSchema>,
500 need_default_by: bool,
501 ) -> Result<(Expr, Vec<Expr>)> {
502 #[allow(deprecated)]
503 let mut time_index_expr = Expr::Wildcard {
504 qualifier: None,
505 options: Box::new(WildcardOptions::default()),
506 };
507 let mut default_by = vec![];
508 let metadata_time_index_expr = (0..schema.fields().len()).find_map(|i| {
509 let (qualifier, field) = schema.qualified_field(i);
510 if field.metadata().contains_key(TIME_INDEX_KEY)
511 && matches!(field.data_type(), DataType::Timestamp(_, _))
512 {
513 Some(Expr::Column(Column::new(
514 qualifier.cloned(),
515 field.name().clone(),
516 )))
517 } else {
518 None
519 }
520 });
521 for i in 0..schema.fields().len() {
522 let (qualifier, _) = schema.qualified_field(i);
523 if let Some(table_ref) = qualifier {
524 let table_source = match self.table_provider.resolve_table(table_ref.clone()).await
525 {
526 Ok(table_source) => table_source,
527 Err(error) => {
528 if matches!(&error, catalog::error::Error::TableNotExist { .. })
532 && metadata_time_index_expr.is_some()
533 {
534 continue;
535 }
536 return Err(error).context(CatalogSnafu);
537 }
538 };
539 let table = table_source
540 .as_any()
541 .downcast_ref::<DefaultTableSource>()
542 .context(UnknownTableSnafu)?
543 .table_provider
544 .as_any()
545 .downcast_ref::<DfTableProviderAdapter>()
546 .context(UnknownTableSnafu)?
547 .table();
548 let schema = table.schema();
549 let time_index_column =
550 schema
551 .timestamp_column()
552 .with_context(|| TimeIndexNotFoundSnafu {
553 table: table_ref.to_string(),
554 })?;
555 if let ConcreteDataType::Timestamp(_) = time_index_column.data_type {
557 default_by = table
558 .table_info()
559 .meta
560 .row_key_column_names()
561 .map(|key| Expr::Column(Column::new(Some(table_ref.clone()), key)))
562 .collect();
563 if default_by.is_empty() {
567 default_by = vec![1.lit()];
568 }
569 time_index_expr = Expr::Column(Column::new(
570 Some(table_ref.clone()),
571 time_index_column.name.clone(),
572 ));
573 }
574 }
575 }
576 #[allow(deprecated)]
577 if matches!(time_index_expr, Expr::Wildcard { .. })
578 && let Some(expr) = metadata_time_index_expr
579 {
580 common_telemetry::debug!(
581 "Range query falling back to time-index metadata for derived input schema: {}",
582 schema
583 );
584 ensure!(
585 !need_default_by,
586 RangeQuerySnafu {
587 msg: "Cannot infer default BY columns from derived range query input"
588 }
589 );
590 time_index_expr = expr;
591 }
592 #[allow(deprecated)]
593 if matches!(time_index_expr, Expr::Wildcard { .. }) {
594 TimeIndexNotFoundSnafu {
595 table: schema.to_string(),
596 }
597 .fail()
598 } else {
599 Ok((time_index_expr, default_by))
600 }
601 }
602}
603
604fn have_range_in_exprs(exprs: &[Expr]) -> bool {
605 exprs.iter().any(|expr| {
606 let mut find_range = false;
607 let _ = expr.apply(|expr| {
608 Ok(match expr {
609 Expr::ScalarFunction(func) if func.name() == "range_fn" => {
610 find_range = true;
611 TreeNodeRecursion::Stop
612 }
613 _ => TreeNodeRecursion::Continue,
614 })
615 });
616 find_range
617 })
618}
619
620fn interval_only_in_expr(expr: &Expr) -> bool {
621 let mut all_interval = true;
622 let _ = expr.apply(|expr| {
623 if matches!(
625 expr,
626 Expr::Cast(Cast{
627 expr,
628 data_type: DataType::Interval(_)
629 }) if matches!(&**expr, Expr::Literal(ScalarValue::Utf8(_), _))
630 ) {
631 return Ok(TreeNodeRecursion::Stop);
634 }
635
636 if !matches!(
637 expr,
638 Expr::Literal(ScalarValue::IntervalDayTime(_), _)
639 | Expr::Literal(ScalarValue::IntervalMonthDayNano(_), _)
640 | Expr::Literal(ScalarValue::IntervalYearMonth(_), _)
641 | Expr::BinaryExpr(_)
642 | Expr::Cast(Cast {
643 data_type: DataType::Interval(_),
644 ..
645 })
646 ) {
647 all_interval = false;
648 Ok(TreeNodeRecursion::Stop)
649 } else {
650 Ok(TreeNodeRecursion::Continue)
651 }
652 });
653
654 all_interval
655}
656
657#[cfg(test)]
658mod test {
659
660 use arrow::datatypes::IntervalUnit;
661 use catalog::RegisterTableRequest;
662 use catalog::memory::MemoryCatalogManager;
663 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
664 use common_time::IntervalYearMonth;
665 use datafusion_expr::{BinaryExpr, Literal, Operator};
666 use datatypes::prelude::ConcreteDataType;
667 use datatypes::schema::{ColumnSchema, Schema};
668 use session::context::QueryContext;
669 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
670 use table::table::TableRef;
671 use table::test_util::EmptyTable;
672
673 use super::*;
674 use crate::options::QueryOptions;
675 use crate::parser::QueryLanguageParser;
676 use crate::{QueryEngineFactory, QueryEngineRef};
677
678 async fn create_test_engine() -> QueryEngineRef {
679 create_test_engine_with_tables(&["test"], false).await
680 }
681
682 async fn create_union_test_engine() -> QueryEngineRef {
683 create_test_engine_with_tables(&["test_0", "test_1"], true).await
684 }
685
686 async fn create_test_engine_with_tables(
687 table_names: &[&str],
688 with_extra_timestamp: bool,
689 ) -> QueryEngineRef {
690 let catalog_list = MemoryCatalogManager::with_default_setup();
691 for (i, table_name) in table_names.iter().enumerate() {
692 let table = create_test_table(table_name, with_extra_timestamp);
693 assert!(
694 catalog_list
695 .register_table_sync(RegisterTableRequest {
696 catalog: DEFAULT_CATALOG_NAME.to_string(),
697 schema: DEFAULT_SCHEMA_NAME.to_string(),
698 table_name: (*table_name).to_string(),
699 table_id: 1024 + i as u32,
700 table,
701 })
702 .is_ok()
703 );
704 }
705 QueryEngineFactory::new(
706 catalog_list,
707 None,
708 None,
709 None,
710 None,
711 false,
712 QueryOptions::default(),
713 )
714 .query_engine()
715 }
716
717 fn create_test_table(table_name: &str, with_extra_timestamp: bool) -> TableRef {
718 let mut columns = vec![];
719 for i in 0..5 {
720 columns.push(ColumnSchema::new(
721 format!("tag_{i}"),
722 ConcreteDataType::string_datatype(),
723 false,
724 ));
725 }
726 columns.push(
727 ColumnSchema::new(
728 "timestamp".to_string(),
729 ConcreteDataType::timestamp_millisecond_datatype(),
730 false,
731 )
732 .with_time_index(true),
733 );
734 if with_extra_timestamp {
735 columns.push(ColumnSchema::new(
736 "timestamp_2".to_string(),
737 ConcreteDataType::timestamp_millisecond_datatype(),
738 true,
739 ));
740 }
741 for i in 0..5 {
742 columns.push(ColumnSchema::new(
743 format!("field_{i}"),
744 ConcreteDataType::float64_datatype(),
745 true,
746 ));
747 }
748 let schema = Arc::new(Schema::new(columns));
749 let table_meta = TableMetaBuilder::empty()
750 .schema(schema)
751 .primary_key_indices((0..5).collect())
752 .value_indices(if with_extra_timestamp {
753 (6..12).collect()
754 } else {
755 (6..11).collect()
756 })
757 .next_column_id(1024)
758 .build()
759 .unwrap();
760 let table_info = TableInfoBuilder::default()
761 .name(table_name)
762 .meta(table_meta)
763 .build()
764 .unwrap();
765 EmptyTable::from_table_info(&table_info)
766 }
767
768 async fn do_query(sql: &str) -> Result<LogicalPlan> {
769 let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
770 let engine = create_test_engine().await;
771 engine.planner().plan(&stmt, QueryContext::arc()).await
772 }
773
774 async fn do_union_query(sql: &str) -> Result<LogicalPlan> {
775 let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
776 let engine = create_union_test_engine().await;
777 engine.planner().plan(&stmt, QueryContext::arc()).await
778 }
779
780 async fn query_plan_compare(sql: &str, expected: String) {
781 let plan = do_query(sql).await.unwrap();
782 assert_eq!(plan.display_indent_schema().to_string(), expected);
783 }
784
785 #[tokio::test]
786 async fn range_no_project() {
787 let query = r#"SELECT timestamp, tag_0, tag_1, avg(field_0 + field_1) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
788 let expected = String::from(
789 "RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8, avg(test.field_0 + test.field_1) RANGE 5m:Float64;N]\
790 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
791 );
792 query_plan_compare(query, expected).await;
793 }
794
795 #[tokio::test]
796 async fn range_expr_calculation() {
797 let query = r#"SELECT (avg(field_0 + field_1)/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
798 let expected = String::from(
799 "Projection: avg(test.field_0 + test.field_1) RANGE 5m / Int64(4) [avg(test.field_0 + test.field_1) RANGE 5m / Int64(4):Float64;N]\
800 \n RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
801 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
802 );
803 query_plan_compare(query, expected).await;
804 }
805
806 #[tokio::test]
807 async fn range_multi_args() {
808 let query =
809 r#"SELECT (covar(field_0 + field_1, field_1)/4) RANGE '5m' FROM test ALIGN '1h';"#;
810 let expected = String::from(
811 "Projection: covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4) [covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4):Float64;N]\
812 \n RangeSelect: range_exprs=[covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1, test.tag_2, test.tag_3, test.tag_4], time_index=timestamp [covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8]\
813 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
814 );
815 query_plan_compare(query, expected).await;
816 }
817
818 #[tokio::test]
819 async fn range_calculation() {
820 let query = r#"SELECT ((avg(field_0)+sum(field_1))/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL;"#;
821 let expected = String::from(
822 "Projection: (avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL) / Int64(4) [avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL / Int64(4):Float64;N]\
823 \n RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL NULL, sum(test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL NULL:Float64;N, sum(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
824 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
825 );
826 query_plan_compare(query, expected).await;
827 }
828
829 #[tokio::test]
830 async fn range_as_sub_query() {
831 let query = r#"SELECT foo + 1 from (SELECT ((avg(field_0)+sum(field_1))/4) RANGE '5m' as foo FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL) where foo > 1;"#;
832 let expected = String::from(
833 "Projection: foo + Int64(1) [foo + Int64(1):Float64;N]\
834 \n Filter: foo > Int64(1) [foo:Float64;N]\
835 \n Projection: (avg(test.field_0) RANGE 5m FILL NULL + sum(test.field_1) RANGE 5m FILL NULL) / Int64(4) AS foo [foo:Float64;N]\
836 \n RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL NULL, sum(test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL NULL:Float64;N, sum(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
837 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
838 );
839 query_plan_compare(query, expected).await;
840 }
841
842 #[tokio::test]
843 async fn range_from_nest_query() {
844 let query = r#"SELECT ((avg(a)+sum(b))/4) RANGE '5m' FROM (SELECT field_0 as a, field_1 as b, tag_0 as c, tag_1 as d, timestamp from test where field_0 > 1.0) ALIGN '1h' by (c, d) FILL NULL;"#;
845 let expected = String::from(
846 "Projection: (avg(a) RANGE 5m FILL NULL + sum(b) RANGE 5m FILL NULL) / Int64(4) [avg(a) RANGE 5m FILL NULL + sum(b) RANGE 5m FILL NULL / Int64(4):Float64;N]\
847 \n RangeSelect: range_exprs=[avg(a) RANGE 5m FILL NULL, sum(b) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[c, d], time_index=timestamp [avg(a) RANGE 5m FILL NULL:Float64;N, sum(b) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(ms), c:Utf8, d:Utf8]\
848 \n Projection: test.field_0 AS a, test.field_1 AS b, test.tag_0 AS c, test.tag_1 AS d, test.timestamp [a:Float64;N, b:Float64;N, c:Utf8, d:Utf8, timestamp:Timestamp(ms)]\
849 \n Filter: test.field_0 > Float64(1) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]\
850 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
851 );
852 query_plan_compare(query, expected).await;
853 }
854
855 #[tokio::test]
856 async fn range_from_union_query() {
857 let queries = [
858 r#"SELECT timestamp, tag_0, avg(field_0) RANGE '5m'
859 FROM (
860 SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_0
861 UNION ALL
862 SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_1
863 )
864 WHERE timestamp >= '1970-01-01 00:00:00'
865 ALIGN '1h' by (tag_0)"#,
866 r#"SELECT tmp.timestamp, tmp.tag_0, avg(tmp.field_0) RANGE '5m'
867 FROM (
868 SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_0
869 UNION ALL
870 SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_1
871 ) AS tmp
872 WHERE tmp.timestamp >= '1970-01-01 00:00:00'
873 ALIGN '1h' by (tmp.tag_0)"#,
874 ];
875
876 for query in queries {
877 let plan = do_union_query(query)
878 .await
879 .unwrap()
880 .display_indent_schema()
881 .to_string();
882
883 assert!(plan.contains("RangeSelect"));
884 assert!(plan.contains("Union"));
885 assert!(plan.contains("time_index=timestamp"));
886 }
887 }
888
889 #[tokio::test]
890 async fn range_from_derived_query_without_by_err() {
891 let queries = [
892 r#"SELECT timestamp, tag_0, avg(field_0) RANGE '5m'
893 FROM (
894 SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_0
895 UNION ALL
896 SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_1
897 )
898 WHERE timestamp >= '1970-01-01 00:00:00'
899 ALIGN '1h'"#,
900 r#"SELECT tmp.timestamp, tmp.tag_0, avg(tmp.field_0) RANGE '5m'
901 FROM (
902 SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_0
903 UNION ALL
904 SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_1
905 ) AS tmp
906 WHERE tmp.timestamp >= '1970-01-01 00:00:00'
907 ALIGN '1h'"#,
908 ];
909
910 for query in queries {
911 assert_eq!(
912 do_union_query(query).await.unwrap_err().to_string(),
913 "Range Query: Cannot infer default BY columns from derived range query input"
914 );
915 }
916 }
917
918 #[tokio::test]
919 async fn range_in_expr() {
920 let query = r#"SELECT sin(avg(field_0 + field_1) RANGE '5m' + 1) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
921 let expected = String::from(
922 "Projection: sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1)) [sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1)):Float64;N]\
923 \n RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
924 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
925 );
926 query_plan_compare(query, expected).await;
927 }
928
929 #[tokio::test]
930 async fn duplicate_range_expr() {
931 let query = r#"SELECT avg(field_0) RANGE '5m' FILL 6.0 + avg(field_0) RANGE '5m' FILL 6.0 FROM test ALIGN '1h' by (tag_0,tag_1);"#;
932 let expected = String::from(
933 "Projection: avg(test.field_0) RANGE 5m FILL 6 + avg(test.field_0) RANGE 5m FILL 6 [avg(test.field_0) RANGE 5m FILL 6 + avg(test.field_0) RANGE 5m FILL 6:Float64]\
934 \n RangeSelect: range_exprs=[avg(test.field_0) RANGE 5m FILL 6], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0) RANGE 5m FILL 6:Float64, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
935 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
936 );
937 query_plan_compare(query, expected).await;
938 }
939
940 #[tokio::test]
941 async fn deep_nest_range_expr() {
942 let query = r#"SELECT round(sin(avg(field_0 + field_1) RANGE '5m' + 1)) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
943 let expected = String::from(
944 "Projection: round(sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1))) [round(sin(avg(test.field_0 + test.field_1) RANGE 5m + Int64(1))):Float64;N]\
945 \n RangeSelect: range_exprs=[avg(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [avg(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
946 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
947 );
948 query_plan_compare(query, expected).await;
949 }
950
951 #[tokio::test]
952 async fn complex_range_expr() {
953 let query = r#"SELECT gcd(CAST(max(field_0 + 1) Range '5m' FILL NULL AS Int64), CAST(tag_0 AS Int64)) + round(max(field_2+1) Range '6m' FILL NULL + 1) + max(field_2+3) Range '10m' FILL NULL * CAST(tag_1 AS Float64) + 1 FROM test ALIGN '1h' by (tag_0, tag_1);"#;
954 let expected = String::from(
955 "Projection: gcd(arrow_cast(max(test.field_0 + Int64(1)) RANGE 5m FILL NULL, Utf8(\"Int64\")), arrow_cast(test.tag_0, Utf8(\"Int64\"))) + round(max(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + max(test.field_2 + Int64(3)) RANGE 10m FILL NULL * arrow_cast(test.tag_1, Utf8(\"Float64\")) + Int64(1) [gcd(arrow_cast(max(test.field_0 + Int64(1)) RANGE 5m FILL NULL,Utf8(\"Int64\")),arrow_cast(test.tag_0,Utf8(\"Int64\"))) + round(max(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + max(test.field_2 + Int64(3)) RANGE 10m FILL NULL * arrow_cast(test.tag_1,Utf8(\"Float64\")) + Int64(1):Float64;N]\
956 \n RangeSelect: range_exprs=[max(test.field_0 + Int64(1)) RANGE 5m FILL NULL, max(test.field_2 + Int64(1)) RANGE 6m FILL NULL, max(test.field_2 + Int64(3)) RANGE 10m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [max(test.field_0 + Int64(1)) RANGE 5m FILL NULL:Float64;N, max(test.field_2 + Int64(1)) RANGE 6m FILL NULL:Float64;N, max(test.field_2 + Int64(3)) RANGE 10m FILL NULL:Float64;N, timestamp:Timestamp(ms), tag_0:Utf8, tag_1:Utf8]\
957 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
958 );
959 query_plan_compare(query, expected).await;
960 }
961
962 #[tokio::test]
963 async fn range_linear_on_integer() {
964 let query = r#"SELECT min(CAST(field_0 AS Int64) + CAST(field_1 AS Int64)) RANGE '5m' FILL LINEAR FROM test ALIGN '1h' by (tag_0,tag_1);"#;
965 let expected = String::from(
966 "RangeSelect: range_exprs=[min(arrow_cast(test.field_0,Utf8(\"Int64\")) + arrow_cast(test.field_1,Utf8(\"Int64\"))) RANGE 5m FILL LINEAR], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [min(arrow_cast(test.field_0,Utf8(\"Int64\")) + arrow_cast(test.field_1,Utf8(\"Int64\"))) RANGE 5m FILL LINEAR:Float64;N]\
967 \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]",
968 );
969 query_plan_compare(query, expected).await;
970 }
971
972 #[tokio::test]
973 async fn range_nest_range_err() {
974 let query = r#"SELECT sum(avg(field_0 + field_1) RANGE '5m' + 1) RANGE '5m' + 1 FROM test ALIGN '1h' by (tag_0,tag_1);"#;
975 assert_eq!(
976 do_query(query).await.unwrap_err().to_string(),
977 "Range Query: Nest Range Query is not allowed"
978 )
979 }
980
981 #[tokio::test]
982 async fn range_argument_err_1() {
985 let query = r#"SELECT range_fn('5m', avg(field_0), 'NULL', '1', tag_0, '1h') FROM test group by tag_0;"#;
986 let error = do_query(query).await.unwrap_err().to_string();
987 assert_eq!(
988 error,
989 "Error during planning: Illegal argument `Utf8(\"5m\")` in range select query"
990 )
991 }
992
993 #[tokio::test]
994 async fn range_argument_err_2() {
995 let query = r#"SELECT range_fn(avg(field_0), 5, 'NULL', '1', tag_0, '1h') FROM test group by tag_0;"#;
996 let error = do_query(query).await.unwrap_err().to_string();
997 assert_eq!(
998 error,
999 "Error during planning: Illegal argument `Int64(5)` in range select query"
1000 )
1001 }
1002
1003 #[test]
1004 fn test_parse_duration_expr() {
1005 let interval = IntervalYearMonth::new(10);
1007 let args = vec![ScalarValue::IntervalYearMonth(Some(interval.to_i32())).lit()];
1008 assert!(parse_duration_expr(&args, 0).is_err(),);
1009 let interval = IntervalDayTime::new(10, 10);
1011 let args = vec![ScalarValue::IntervalDayTime(Some(interval.into())).lit()];
1012 assert_eq!(
1013 parse_duration_expr(&args, 0).unwrap().as_millis() as i64,
1014 interval.as_millis()
1015 );
1016 let interval = IntervalMonthDayNano::new(0, 10, 10);
1018 let args = vec![ScalarValue::IntervalMonthDayNano(Some(interval.into())).lit()];
1019 assert_eq!(
1020 parse_duration_expr(&args, 0).unwrap().as_millis() as i64,
1021 interval.days as i64 * MS_PER_DAY + interval.nanoseconds / NANOS_PER_MILLI,
1022 );
1023 let args = vec!["1y4w".lit()];
1025 assert_eq!(
1026 parse_duration_expr(&args, 0).unwrap(),
1027 parse_duration("1y4w").unwrap()
1028 );
1029 let args = vec![Expr::Cast(Cast {
1031 expr: Box::new("15 minutes".lit()),
1032 data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1033 })];
1034 assert_eq!(
1035 parse_duration_expr(&args, 0).unwrap(),
1036 parse_duration("15m").unwrap()
1037 );
1038 assert!(parse_duration_expr(&args, 10).is_err());
1040 let args = vec![Expr::BinaryExpr(BinaryExpr {
1042 left: Box::new(
1043 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
1044 ),
1045 op: Operator::Plus,
1046 right: Box::new(
1047 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
1048 ),
1049 })];
1050 assert_eq!(
1051 parse_duration_expr(&args, 0).unwrap(),
1052 Duration::from_millis(20)
1053 );
1054 let args = vec![Expr::BinaryExpr(BinaryExpr {
1055 left: Box::new(
1056 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
1057 ),
1058 op: Operator::Minus,
1059 right: Box::new(
1060 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
1061 ),
1062 })];
1063 assert!(parse_duration_expr(&args, 0).is_err());
1065 let args = vec![Expr::BinaryExpr(BinaryExpr {
1067 left: Box::new(
1068 ScalarValue::IntervalYearMonth(Some(IntervalYearMonth::new(10).to_i32())).lit(),
1069 ),
1070 op: Operator::Minus,
1071 right: Box::new(ScalarValue::Time64Microsecond(Some(0)).lit()),
1072 })];
1073 assert!(parse_duration_expr(&args, 0).is_err());
1074 }
1075
1076 #[test]
1077 fn test_parse_align_to() {
1078 let args = vec!["NOW".lit()];
1080 let epsinon = parse_align_to(&args, 0, None).unwrap() - Timestamp::current_millis().value();
1081 assert!(epsinon.abs() < 100);
1082 let args = vec!["".lit()];
1084 assert_eq!(0, parse_align_to(&args, 0, None).unwrap());
1085 let args = vec!["".lit()];
1087 assert_eq!(
1088 -36000 * 1000,
1089 parse_align_to(&args, 0, Some(&Timezone::from_tz_string("HST").unwrap())).unwrap()
1090 );
1091 assert_eq!(
1092 28800 * 1000,
1093 parse_align_to(
1094 &args,
1095 0,
1096 Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
1097 )
1098 .unwrap()
1099 );
1100
1101 let args = vec!["1970-01-01T00:00:00+08:00".lit()];
1103 assert_eq!(parse_align_to(&args, 0, None).unwrap(), -8 * 60 * 60 * 1000);
1104 let args = vec!["1970-01-01T00:00:00".lit()];
1106 assert_eq!(
1107 parse_align_to(
1108 &args,
1109 0,
1110 Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
1111 )
1112 .unwrap(),
1113 -8 * 60 * 60 * 1000
1114 );
1115 let args = vec![Expr::BinaryExpr(BinaryExpr {
1117 left: Box::new(
1118 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
1119 ),
1120 op: Operator::Plus,
1121 right: Box::new(
1122 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(0, 10).into())).lit(),
1123 ),
1124 })];
1125 assert_eq!(parse_align_to(&args, 0, None).unwrap(), 20);
1126 }
1127
1128 #[test]
1129 fn test_interval_only() {
1130 let expr = Expr::BinaryExpr(BinaryExpr {
1131 left: Box::new(ScalarValue::DurationMillisecond(Some(20)).lit()),
1132 op: Operator::Minus,
1133 right: Box::new(
1134 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
1135 ),
1136 });
1137 assert!(!interval_only_in_expr(&expr));
1138 let expr = Expr::BinaryExpr(BinaryExpr {
1139 left: Box::new(
1140 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
1141 ),
1142 op: Operator::Minus,
1143 right: Box::new(
1144 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
1145 ),
1146 });
1147 assert!(interval_only_in_expr(&expr));
1148
1149 let expr = Expr::BinaryExpr(BinaryExpr {
1150 left: Box::new(Expr::Cast(Cast {
1151 expr: Box::new("15 minute".lit()),
1152 data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1153 })),
1154 op: Operator::Minus,
1155 right: Box::new(
1156 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
1157 ),
1158 });
1159 assert!(interval_only_in_expr(&expr));
1160
1161 let expr = Expr::Cast(Cast {
1162 expr: Box::new(Expr::BinaryExpr(BinaryExpr {
1163 left: Box::new(Expr::Cast(Cast {
1164 expr: Box::new("15 minute".lit()),
1165 data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1166 })),
1167 op: Operator::Minus,
1168 right: Box::new(
1169 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(10, 0).into())).lit(),
1170 ),
1171 })),
1172 data_type: DataType::Interval(IntervalUnit::MonthDayNano),
1173 });
1174
1175 assert!(interval_only_in_expr(&expr));
1176 }
1177}