use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::Duration;
use arrow_schema::DataType;
use async_recursion::async_recursion;
use catalog::table_source::DfTableSourceProvider;
use chrono::Utc;
use common_time::interval::{MS_PER_DAY, NANOS_PER_MILLI};
use common_time::timestamp::TimeUnit;
use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp, Timezone};
use datafusion::datasource::DefaultTableSource;
use datafusion::prelude::Column;
use datafusion::scalar::ScalarValue;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
use datafusion_common::{DFSchema, DataFusionError, Result as DFResult};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{
Aggregate, Analyze, Explain, Expr, ExprSchemable, Extension, LogicalPlan, LogicalPlanBuilder,
Projection,
};
use datafusion_optimizer::simplify_expressions::ExprSimplifier;
use datatypes::prelude::ConcreteDataType;
use promql_parser::util::parse_duration;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use table::table::adapter::DfTableProviderAdapter;
use super::plan::Fill;
use crate::error::{
CatalogSnafu, DataFusionSnafu, RangeQuerySnafu, Result, TimeIndexNotFoundSnafu,
UnknownTableSnafu,
};
use crate::range_select::plan::{RangeFn, RangeSelect};
pub struct RangeExprRewriter<'a> {
input_plan: &'a Arc<LogicalPlan>,
align: Duration,
align_to: i64,
by: Vec<Expr>,
range_fn: BTreeSet<RangeFn>,
sub_aggr: &'a Aggregate,
query_ctx: &'a QueryContextRef,
}
impl RangeExprRewriter<'_> {
pub fn get_range_expr(&self, args: &[Expr], i: usize) -> DFResult<Expr> {
match args.get(i) {
Some(Expr::Column(column)) => {
let index = self.sub_aggr.schema.index_of_column(column)?;
let len = self.sub_aggr.group_expr.len();
self.sub_aggr
.aggr_expr
.get(index - len)
.cloned()
.ok_or(DataFusionError::Plan(
"Range expr not found in underlying Aggregate Plan".into(),
))
}
other => Err(dispose_parse_error(other)),
}
}
}
#[inline]
fn dispose_parse_error(expr: Option<&Expr>) -> DataFusionError {
DataFusionError::Plan(
expr.map(|x| {
format!(
"Illegal argument `{}` in range select query",
x.display_name().unwrap_or_default()
)
})
.unwrap_or("Missing argument in range select query".into()),
)
}
fn parse_str_expr(args: &[Expr], i: usize) -> DFResult<&str> {
match args.get(i) {
Some(Expr::Literal(ScalarValue::Utf8(Some(str)))) => Ok(str.as_str()),
other => Err(dispose_parse_error(other)),
}
}
fn parse_expr_to_string(args: &[Expr], i: usize) -> DFResult<String> {
match args.get(i) {
Some(Expr::Literal(ScalarValue::Utf8(Some(str)))) => Ok(str.to_string()),
Some(expr) => Ok(expr.display_name().unwrap_or_default()),
None => Err(dispose_parse_error(None)),
}
}
fn parse_duration_expr(args: &[Expr], i: usize) -> DFResult<Duration> {
match args.get(i) {
Some(Expr::Literal(ScalarValue::Utf8(Some(str)))) => {
parse_duration(str).map_err(DataFusionError::Plan)
}
Some(expr) => {
let ms = evaluate_expr_to_millisecond(args, i, true)?;
if ms <= 0 {
return Err(dispose_parse_error(Some(expr)));
}
Ok(Duration::from_millis(ms as u64))
}
None => Err(dispose_parse_error(None)),
}
}
fn evaluate_expr_to_millisecond(args: &[Expr], i: usize, interval_only: bool) -> DFResult<i64> {
let Some(expr) = args.get(i) else {
return Err(dispose_parse_error(None));
};
if interval_only && !interval_only_in_expr(expr) {
return Err(dispose_parse_error(Some(expr)));
}
let execution_props = ExecutionProps::new().with_query_execution_start_time(Utc::now());
let info = SimplifyContext::new(&execution_props).with_schema(Arc::new(DFSchema::empty()));
let simplify_expr = ExprSimplifier::new(info).simplify(expr.clone())?;
match simplify_expr {
Expr::Literal(ScalarValue::TimestampNanosecond(ts_nanos, _))
| Expr::Literal(ScalarValue::DurationNanosecond(ts_nanos)) => {
ts_nanos.map(|v| v / 1_000_000)
}
Expr::Literal(ScalarValue::TimestampMicrosecond(ts_micros, _))
| Expr::Literal(ScalarValue::DurationMicrosecond(ts_micros)) => {
ts_micros.map(|v| v / 1_000)
}
Expr::Literal(ScalarValue::TimestampMillisecond(ts_millis, _))
| Expr::Literal(ScalarValue::DurationMillisecond(ts_millis)) => ts_millis,
Expr::Literal(ScalarValue::TimestampSecond(ts_secs, _))
| Expr::Literal(ScalarValue::DurationSecond(ts_secs)) => ts_secs.map(|v| v * 1_000),
Expr::Literal(ScalarValue::IntervalYearMonth(interval)) => interval
.map(|v| {
let interval = IntervalYearMonth::from_i32(v);
if interval.months != 0 {
return Err(DataFusionError::Plan(format!(
"Year or month interval is not allowed in range query: {}",
expr.display_name().unwrap_or_default()
)));
}
Ok(0)
})
.transpose()?,
Expr::Literal(ScalarValue::IntervalDayTime(interval)) => interval.map(|v| {
let interval = IntervalDayTime::from_i64(v);
interval.as_millis()
}),
Expr::Literal(ScalarValue::IntervalMonthDayNano(interval)) => interval
.map(|v| {
let interval = IntervalMonthDayNano::from_i128(v);
if interval.months != 0 {
return Err(DataFusionError::Plan(format!(
"Year or month interval is not allowed in range query: {}",
expr.display_name().unwrap_or_default()
)));
}
Ok(interval.days as i64 * MS_PER_DAY + interval.nanoseconds / NANOS_PER_MILLI)
})
.transpose()?,
_ => None,
}
.ok_or_else(|| {
DataFusionError::Plan(format!(
"{} is not a expr can be evaluate and use in range query",
expr.display_name().unwrap_or_default()
))
})
}
fn parse_align_to(args: &[Expr], i: usize, timezone: Option<&Timezone>) -> DFResult<i64> {
let Ok(s) = parse_str_expr(args, i) else {
return evaluate_expr_to_millisecond(args, i, false);
};
let upper = s.to_uppercase();
match upper.as_str() {
"NOW" => return Ok(Timestamp::current_millis().value()),
"" => return Ok(timezone.map(|tz| tz.local_minus_utc() * 1000).unwrap_or(0)),
_ => (),
}
Timestamp::from_str(s, timezone)
.map_err(|e| {
DataFusionError::Plan(format!(
"Illegal `align to` argument `{}` in range select query, can't be parse as NOW/CALENDAR/Timestamp, error: {}",
s, e
))
})?.convert_to(TimeUnit::Millisecond).map(|x|x.value()).ok_or(DataFusionError::Plan(format!(
"Illegal `align to` argument `{}` in range select query, can't be convert to a valid Timestamp",
s
))
)
}
fn parse_expr_list(args: &[Expr], start: usize, len: usize) -> DFResult<Vec<Expr>> {
let mut outs = Vec::with_capacity(len);
for i in start..start + len {
outs.push(match &args.get(i) {
Some(
Expr::Column(_) | Expr::Literal(_) | Expr::BinaryExpr(_) | Expr::ScalarFunction(_),
) => args[i].clone(),
other => {
return Err(dispose_parse_error(*other));
}
});
}
Ok(outs)
}
macro_rules! inconsistent_check {
($self: ident.$name: ident, $cond: expr) => {
if $cond && $self.$name != $name {
return Err(DataFusionError::Plan(
concat!(
"Inconsistent ",
stringify!($name),
" given in Range Function Rewrite"
)
.into(),
));
} else {
$self.$name = $name;
}
};
}
impl TreeNodeRewriter for RangeExprRewriter<'_> {
type Node = Expr;
fn f_down(&mut self, node: Expr) -> DFResult<Transformed<Expr>> {
if let Expr::ScalarFunction(func) = &node {
if func.name() == "range_fn" {
let range_expr = self.get_range_expr(&func.args, 0)?;
let range = parse_duration_expr(&func.args, 1)?;
let byc = str::parse::<usize>(parse_str_expr(&func.args, 3)?)
.map_err(|e| DataFusionError::Plan(e.to_string()))?;
let by = parse_expr_list(&func.args, 4, byc)?;
let align = parse_duration_expr(&func.args, byc + 4)?;
let align_to =
parse_align_to(&func.args, byc + 5, Some(&self.query_ctx.timezone()))?;
let mut data_type = range_expr.get_type(self.input_plan.schema())?;
let mut need_cast = false;
let fill = Fill::try_from_str(parse_str_expr(&func.args, 2)?, &data_type)?;
if matches!(fill, Some(Fill::Linear)) && data_type.is_integer() {
data_type = DataType::Float64;
need_cast = true;
}
inconsistent_check!(self.by, !self.by.is_empty());
inconsistent_check!(self.align, self.align != Duration::default());
inconsistent_check!(self.align_to, self.align_to != 0);
let range_fn = RangeFn {
name: if let Some(fill) = &fill {
format!(
"{} RANGE {} FILL {}",
range_expr.display_name()?,
parse_expr_to_string(&func.args, 1)?,
fill
)
} else {
format!(
"{} RANGE {}",
range_expr.display_name()?,
parse_expr_to_string(&func.args, 1)?,
)
},
data_type,
expr: range_expr,
range,
fill,
need_cast,
};
let alias = Expr::Column(Column::from_name(range_fn.name.clone()));
self.range_fn.insert(range_fn);
return Ok(Transformed::yes(alias));
}
}
Ok(Transformed::no(node))
}
}
pub struct RangePlanRewriter {
table_provider: DfTableSourceProvider,
query_ctx: QueryContextRef,
}
impl RangePlanRewriter {
pub fn new(table_provider: DfTableSourceProvider, query_ctx: QueryContextRef) -> Self {
Self {
table_provider,
query_ctx,
}
}
pub async fn rewrite(&mut self, plan: LogicalPlan) -> Result<LogicalPlan> {
match self.rewrite_logical_plan(&plan).await? {
Some(new_plan) => Ok(new_plan),
None => Ok(plan),
}
}
#[async_recursion]
async fn rewrite_logical_plan(&mut self, plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
let inputs = plan.inputs();
let mut new_inputs = Vec::with_capacity(inputs.len());
for input in &inputs {
new_inputs.push(self.rewrite_logical_plan(input).await?)
}
match plan {
LogicalPlan::Projection(Projection { expr, input, .. })
if have_range_in_exprs(expr) =>
{
let (aggr_plan, input) = if let LogicalPlan::Aggregate(aggr) = input.as_ref() {
if have_range_in_exprs(&aggr.aggr_expr) {
return RangeQuerySnafu {
msg: "Nest Range Query is not allowed",
}
.fail();
}
(aggr, aggr.input.clone())
} else {
return RangeQuerySnafu {
msg: "Window functions is not allowed in Range Query",
}
.fail();
};
let (time_index, default_by) = self.get_index_by(input.schema()).await?;
let mut range_rewriter = RangeExprRewriter {
input_plan: &input,
align: Duration::default(),
align_to: 0,
by: vec![],
range_fn: BTreeSet::new(),
sub_aggr: aggr_plan,
query_ctx: &self.query_ctx,
};
let new_expr = expr
.iter()
.map(|expr| expr.clone().rewrite(&mut range_rewriter).map(|x| x.data))
.collect::<DFResult<Vec<_>>>()
.context(DataFusionSnafu)?;
if range_rewriter.by.is_empty() {
range_rewriter.by = default_by;
}
let range_select = RangeSelect::try_new(
input.clone(),
range_rewriter.range_fn.into_iter().collect(),
range_rewriter.align,
range_rewriter.align_to,
time_index,
range_rewriter.by,
&new_expr,
)?;
let no_additional_project = range_select.schema_project.is_some();
let range_plan = LogicalPlan::Extension(Extension {
node: Arc::new(range_select),
});
if no_additional_project {
Ok(Some(range_plan))
} else {
let project_plan = LogicalPlanBuilder::from(range_plan)
.project(new_expr)
.context(DataFusionSnafu)?
.build()
.context(DataFusionSnafu)?;
Ok(Some(project_plan))
}
}
_ => {
if new_inputs.iter().any(|x| x.is_some()) {
let inputs: Vec<LogicalPlan> = new_inputs
.into_iter()
.zip(inputs)
.map(|(x, y)| match x {
Some(plan) => plan,
None => y.clone(),
})
.collect();
let plan = match plan {
LogicalPlan::Analyze(Analyze { verbose, .. }) => {
ensure!(
inputs.len() == 1,
RangeQuerySnafu {
msg: "Illegal subplan nums when rewrite Analyze logical plan",
}
);
LogicalPlanBuilder::from(inputs[0].clone())
.explain(*verbose, true)
.context(DataFusionSnafu)?
.build()
}
LogicalPlan::Explain(Explain { verbose, .. }) => {
ensure!(
inputs.len() == 1,
RangeQuerySnafu {
msg: "Illegal subplan nums when rewrite Explain logical plan",
}
);
LogicalPlanBuilder::from(inputs[0].clone())
.explain(*verbose, false)
.context(DataFusionSnafu)?
.build()
}
_ => plan.with_new_exprs(plan.expressions(), inputs),
}
.context(DataFusionSnafu)?;
Ok(Some(plan))
} else {
Ok(None)
}
}
}
}
async fn get_index_by(&mut self, schema: &Arc<DFSchema>) -> Result<(Expr, Vec<Expr>)> {
let mut time_index_expr = Expr::Wildcard { qualifier: None };
let mut default_by = vec![];
for i in 0..schema.fields().len() {
let (qualifier, _) = schema.qualified_field(i);
if let Some(table_ref) = qualifier {
let table = self
.table_provider
.resolve_table(table_ref.clone())
.await
.context(CatalogSnafu)?
.as_any()
.downcast_ref::<DefaultTableSource>()
.context(UnknownTableSnafu)?
.table_provider
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
.context(UnknownTableSnafu)?
.table();
let schema = table.schema();
let time_index_column =
schema
.timestamp_column()
.with_context(|| TimeIndexNotFoundSnafu {
table: table_ref.to_string(),
})?;
if let ConcreteDataType::Timestamp(_) = time_index_column.data_type {
default_by = table
.table_info()
.meta
.row_key_column_names()
.map(|key| Expr::Column(Column::new(Some(table_ref.clone()), key)))
.collect();
if default_by.is_empty() {
default_by = vec![Expr::Literal(ScalarValue::Int64(Some(1)))];
}
time_index_expr = Expr::Column(Column::new(
Some(table_ref.clone()),
time_index_column.name.clone(),
));
}
}
}
if matches!(time_index_expr, Expr::Wildcard { .. }) {
TimeIndexNotFoundSnafu {
table: schema.to_string(),
}
.fail()
} else {
Ok((time_index_expr, default_by))
}
}
}
fn have_range_in_exprs(exprs: &[Expr]) -> bool {
exprs.iter().any(|expr| {
let mut find_range = false;
let _ = expr.apply(|expr| {
Ok(match expr {
Expr::ScalarFunction(func) if func.name() == "range_fn" => {
find_range = true;
TreeNodeRecursion::Stop
}
_ => TreeNodeRecursion::Continue,
})
});
find_range
})
}
fn interval_only_in_expr(expr: &Expr) -> bool {
let mut all_interval = true;
let _ = expr.apply(|expr| {
if !matches!(
expr,
Expr::Literal(ScalarValue::IntervalDayTime(_))
| Expr::Literal(ScalarValue::IntervalMonthDayNano(_))
| Expr::Literal(ScalarValue::IntervalYearMonth(_))
| Expr::BinaryExpr(_)
) {
all_interval = false;
Ok(TreeNodeRecursion::Stop)
} else {
Ok(TreeNodeRecursion::Continue)
}
});
all_interval
}
#[cfg(test)]
mod test {
use std::error::Error;
use catalog::memory::MemoryCatalogManager;
use catalog::RegisterTableRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_time::IntervalYearMonth;
use datafusion_expr::{BinaryExpr, Operator};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use session::context::QueryContext;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use table::test_util::EmptyTable;
use super::*;
use crate::parser::QueryLanguageParser;
use crate::{QueryEngineFactory, QueryEngineRef};
async fn create_test_engine() -> QueryEngineRef {
let table_name = "test".to_string();
let mut columns = vec![];
for i in 0..5 {
columns.push(ColumnSchema::new(
format!("tag_{i}"),
ConcreteDataType::string_datatype(),
false,
));
}
columns.push(
ColumnSchema::new(
"timestamp".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
);
for i in 0..5 {
columns.push(ColumnSchema::new(
format!("field_{i}"),
ConcreteDataType::float64_datatype(),
true,
));
}
let schema = Arc::new(Schema::new(columns));
let table_meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices((0..5).collect())
.value_indices((6..11).collect())
.next_column_id(1024)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.name(&table_name)
.meta(table_meta)
.build()
.unwrap();
let table = EmptyTable::from_table_info(&table_info);
let catalog_list = MemoryCatalogManager::with_default_setup();
assert!(catalog_list
.register_table_sync(RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name,
table_id: 1024,
table,
})
.is_ok());
QueryEngineFactory::new(catalog_list, None, None, None, None, false).query_engine()
}
async fn do_query(sql: &str) -> Result<LogicalPlan> {
let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
let engine = create_test_engine().await;
engine.planner().plan(&stmt, QueryContext::arc()).await
}
async fn query_plan_compare(sql: &str, expected: String) {
let plan = do_query(sql).await.unwrap();
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
#[tokio::test]
async fn range_no_project() {
let query = r#"SELECT timestamp, tag_0, tag_1, avg(field_0 + field_1) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
let expected = String::from(
"RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, AVG(test.field_0 + test.field_1) RANGE 5m:Float64;N]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn range_expr_calculation() {
let query = r#"SELECT (avg(field_0 + field_1)/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
let expected = String::from(
"Projection: AVG(test.field_0 + test.field_1) RANGE 5m / Int64(4) [AVG(test.field_0 + test.field_1) RANGE 5m / Int64(4):Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn range_multi_args() {
let query =
r#"SELECT (covar(field_0 + field_1, field_1)/4) RANGE '5m' FROM test ALIGN '1h';"#;
let expected = String::from(
"Projection: covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4) [covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4):Float64;N]\
\n RangeSelect: range_exprs=[covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1, test.tag_2, test.tag_3, test.tag_4], time_index=timestamp [covar_samp(test.field_0 + test.field_1,test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn range_calculation() {
let query = r#"SELECT ((avg(field_0)+sum(field_1))/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL;"#;
let expected = String::from(
"Projection: (AVG(test.field_0) RANGE 5m FILL NULL + SUM(test.field_1) RANGE 5m FILL NULL) / Int64(4) [AVG(test.field_0) RANGE 5m FILL NULL + SUM(test.field_1) RANGE 5m FILL NULL / Int64(4):Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0) RANGE 5m FILL NULL, SUM(test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [AVG(test.field_0) RANGE 5m FILL NULL:Float64;N, SUM(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn range_as_sub_query() {
let query = r#"SELECT foo + 1 from (SELECT ((avg(field_0)+sum(field_1))/4) RANGE '5m' as foo FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL) where foo > 1;"#;
let expected = String::from(
"Projection: foo + Int64(1) [foo + Int64(1):Float64;N]\
\n Filter: foo > Int64(1) [foo:Float64;N]\
\n Projection: (AVG(test.field_0) RANGE 5m FILL NULL + SUM(test.field_1) RANGE 5m FILL NULL) / Int64(4) AS foo [foo:Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0) RANGE 5m FILL NULL, SUM(test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [AVG(test.field_0) RANGE 5m FILL NULL:Float64;N, SUM(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn range_from_nest_query() {
let query = r#"SELECT ((avg(a)+sum(b))/4) RANGE '5m' FROM (SELECT field_0 as a, field_1 as b, tag_0 as c, tag_1 as d, timestamp from test where field_0 > 1.0) ALIGN '1h' by (c, d) FILL NULL;"#;
let expected = String::from(
"Projection: (AVG(a) RANGE 5m FILL NULL + SUM(b) RANGE 5m FILL NULL) / Int64(4) [AVG(a) RANGE 5m FILL NULL + SUM(b) RANGE 5m FILL NULL / Int64(4):Float64;N]\
\n RangeSelect: range_exprs=[AVG(a) RANGE 5m FILL NULL, SUM(b) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[c, d], time_index=timestamp [AVG(a) RANGE 5m FILL NULL:Float64;N, SUM(b) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), c:Utf8, d:Utf8]\
\n Projection: test.field_0 AS a, test.field_1 AS b, test.tag_0 AS c, test.tag_1 AS d, test.timestamp [a:Float64;N, b:Float64;N, c:Utf8, d:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Filter: test.field_0 > Float64(1) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn range_in_expr() {
let query = r#"SELECT sin(avg(field_0 + field_1) RANGE '5m' + 1) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
let expected = String::from(
"Projection: sin(AVG(test.field_0 + test.field_1) RANGE 5m + Int64(1)) [sin(AVG(test.field_0 + test.field_1) RANGE 5m + Int64(1)):Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn duplicate_range_expr() {
let query = r#"SELECT avg(field_0) RANGE '5m' FILL 6.0 + avg(field_0) RANGE '5m' FILL 6.0 FROM test ALIGN '1h' by (tag_0,tag_1);"#;
let expected = String::from(
"Projection: AVG(test.field_0) RANGE 5m FILL 6 + AVG(test.field_0) RANGE 5m FILL 6 [AVG(test.field_0) RANGE 5m FILL 6 + AVG(test.field_0) RANGE 5m FILL 6:Float64]\
\n RangeSelect: range_exprs=[AVG(test.field_0) RANGE 5m FILL 6], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [AVG(test.field_0) RANGE 5m FILL 6:Float64, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn deep_nest_range_expr() {
let query = r#"SELECT round(sin(avg(field_0 + field_1) RANGE '5m' + 1)) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
let expected = String::from(
"Projection: round(sin(AVG(test.field_0 + test.field_1) RANGE 5m + Int64(1))) [round(sin(AVG(test.field_0 + test.field_1) RANGE 5m + Int64(1))):Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn complex_range_expr() {
let query = r#"SELECT gcd(CAST(max(field_0 + 1) Range '5m' FILL NULL AS Int64), CAST(tag_0 AS Int64)) + round(max(field_2+1) Range '6m' FILL NULL + 1) + max(field_2+3) Range '10m' FILL NULL * CAST(tag_1 AS Float64) + 1 FROM test ALIGN '1h' by (tag_0, tag_1);"#;
let expected = String::from(
"Projection: gcd(arrow_cast(MAX(test.field_0 + Int64(1)) RANGE 5m FILL NULL, Utf8(\"Int64\")), arrow_cast(test.tag_0, Utf8(\"Int64\"))) + round(MAX(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + MAX(test.field_2 + Int64(3)) RANGE 10m FILL NULL * arrow_cast(test.tag_1, Utf8(\"Float64\")) + Int64(1) [gcd(arrow_cast(MAX(test.field_0 + Int64(1)) RANGE 5m FILL NULL,Utf8(\"Int64\")),arrow_cast(test.tag_0,Utf8(\"Int64\"))) + round(MAX(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + MAX(test.field_2 + Int64(3)) RANGE 10m FILL NULL * arrow_cast(test.tag_1,Utf8(\"Float64\")) + Int64(1):Float64;N]\
\n RangeSelect: range_exprs=[MAX(test.field_0 + Int64(1)) RANGE 5m FILL NULL, MAX(test.field_2 + Int64(1)) RANGE 6m FILL NULL, MAX(test.field_2 + Int64(3)) RANGE 10m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [MAX(test.field_0 + Int64(1)) RANGE 5m FILL NULL:Float64;N, MAX(test.field_2 + Int64(1)) RANGE 6m FILL NULL:Float64;N, MAX(test.field_2 + Int64(3)) RANGE 10m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn range_linear_on_integer() {
let query = r#"SELECT min(CAST(field_0 AS Int64) + CAST(field_1 AS Int64)) RANGE '5m' FILL LINEAR FROM test ALIGN '1h' by (tag_0,tag_1);"#;
let expected = String::from(
"RangeSelect: range_exprs=[MIN(arrow_cast(test.field_0,Utf8(\"Int64\")) + arrow_cast(test.field_1,Utf8(\"Int64\"))) RANGE 5m FILL LINEAR], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [MIN(arrow_cast(test.field_0,Utf8(\"Int64\")) + arrow_cast(test.field_1,Utf8(\"Int64\"))) RANGE 5m FILL LINEAR:Float64;N]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
}
#[tokio::test]
async fn range_nest_range_err() {
let query = r#"SELECT sum(avg(field_0 + field_1) RANGE '5m' + 1) RANGE '5m' + 1 FROM test ALIGN '1h' by (tag_0,tag_1);"#;
assert_eq!(
do_query(query).await.unwrap_err().to_string(),
"Range Query: Nest Range Query is not allowed"
)
}
#[tokio::test]
async fn range_argument_err_1() {
let query = r#"SELECT range_fn('5m', avg(field_0), 'NULL', '1', tag_0, '1h') FROM test group by tag_0;"#;
let error = do_query(query)
.await
.unwrap_err()
.source()
.unwrap()
.to_string();
assert_eq!(
error,
"Error during planning: Illegal argument `Utf8(\"5m\")` in range select query"
)
}
#[tokio::test]
async fn range_argument_err_2() {
let query = r#"SELECT range_fn(avg(field_0), 5, 'NULL', '1', tag_0, '1h') FROM test group by tag_0;"#;
let error = do_query(query)
.await
.unwrap_err()
.source()
.unwrap()
.to_string();
assert_eq!(
error,
"Error during planning: Illegal argument `Int64(5)` in range select query"
)
}
#[test]
fn test_parse_duration_expr() {
let interval = IntervalYearMonth::new(10);
let args = vec![Expr::Literal(ScalarValue::IntervalYearMonth(Some(
interval.to_i32(),
)))];
assert!(parse_duration_expr(&args, 0).is_err(),);
let interval = IntervalDayTime::new(10, 10);
let args = vec![Expr::Literal(ScalarValue::IntervalDayTime(Some(
interval.to_i64(),
)))];
assert_eq!(
parse_duration_expr(&args, 0).unwrap().as_millis() as i64,
interval.as_millis()
);
let interval = IntervalMonthDayNano::new(0, 10, 10);
let args = vec![Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(
interval.to_i128(),
)))];
assert_eq!(
parse_duration_expr(&args, 0).unwrap().as_millis() as i64,
interval.days as i64 * MS_PER_DAY + interval.nanoseconds / NANOS_PER_MILLI,
);
let args = vec![Expr::Literal(ScalarValue::Utf8(Some("1y4w".into())))];
assert_eq!(
parse_duration_expr(&args, 0).unwrap(),
parse_duration("1y4w").unwrap()
);
assert!(parse_duration_expr(&args, 10).is_err());
let args = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
IntervalDayTime::new(0, 10).to_i64(),
)))),
op: Operator::Plus,
right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
IntervalDayTime::new(0, 10).to_i64(),
)))),
})];
assert_eq!(
parse_duration_expr(&args, 0).unwrap(),
Duration::from_millis(20)
);
let args = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
IntervalDayTime::new(0, 10).to_i64(),
)))),
op: Operator::Minus,
right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
IntervalDayTime::new(0, 10).to_i64(),
)))),
})];
assert!(parse_duration_expr(&args, 0).is_err());
let args = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
IntervalYearMonth::new(10).to_i32(),
)))),
op: Operator::Minus,
right: Box::new(Expr::Literal(ScalarValue::Time64Microsecond(Some(0)))),
})];
assert!(parse_duration_expr(&args, 0).is_err());
}
#[test]
fn test_parse_align_to() {
let args = vec![Expr::Literal(ScalarValue::Utf8(Some("NOW".into())))];
let epsinon = parse_align_to(&args, 0, None).unwrap() - Timestamp::current_millis().value();
assert!(epsinon.abs() < 100);
let args = vec![Expr::Literal(ScalarValue::Utf8(Some("".into())))];
assert_eq!(0, parse_align_to(&args, 0, None).unwrap());
let args = vec![Expr::Literal(ScalarValue::Utf8(Some("".into())))];
assert_eq!(
-36000 * 1000,
parse_align_to(&args, 0, Some(&Timezone::from_tz_string("HST").unwrap())).unwrap()
);
assert_eq!(
28800 * 1000,
parse_align_to(
&args,
0,
Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
)
.unwrap()
);
let args = vec![Expr::Literal(ScalarValue::Utf8(Some(
"1970-01-01T00:00:00+08:00".into(),
)))];
assert_eq!(parse_align_to(&args, 0, None).unwrap(), -8 * 60 * 60 * 1000);
let args = vec![Expr::Literal(ScalarValue::Utf8(Some(
"1970-01-01T00:00:00".into(),
)))];
assert_eq!(
parse_align_to(
&args,
0,
Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
)
.unwrap(),
-8 * 60 * 60 * 1000
);
let args = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
IntervalDayTime::new(0, 10).to_i64(),
)))),
op: Operator::Plus,
right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
IntervalDayTime::new(0, 10).to_i64(),
)))),
})];
assert_eq!(parse_align_to(&args, 0, None).unwrap(), 20);
}
#[test]
fn test_interval_only() {
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::DurationMillisecond(Some(20)))),
op: Operator::Minus,
right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
IntervalDayTime::new(10, 0).to_i64(),
)))),
});
assert!(!interval_only_in_expr(&expr));
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
IntervalDayTime::new(10, 0).to_i64(),
)))),
op: Operator::Minus,
right: Box::new(Expr::Literal(ScalarValue::IntervalDayTime(Some(
IntervalDayTime::new(10, 0).to_i64(),
)))),
});
assert!(interval_only_in_expr(&expr));
}
}