common_query/logical_plan/
expr.rsuse common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::expr::Expr;
use datafusion_expr::{and, binary_expr, Operator};
use datatypes::data_type::DataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
pub fn build_same_type_ts_filter(
ts_schema: &ColumnSchema,
time_range: Option<TimestampRange>,
) -> Option<Expr> {
let ts_type = ts_schema.data_type.clone();
let time_range = time_range?;
let start = time_range
.start()
.and_then(|start| ts_type.try_cast(Value::Timestamp(start)));
let end = time_range
.end()
.and_then(|end| ts_type.try_cast(Value::Timestamp(end)));
let time_range = match (start, end) {
(Some(Value::Timestamp(start)), Some(Value::Timestamp(end))) => {
TimestampRange::new(start, end)
}
(Some(Value::Timestamp(start)), None) => Some(TimestampRange::from_start(start)),
(None, Some(Value::Timestamp(end))) => Some(TimestampRange::until_end(end, false)),
_ => return None,
};
build_filter_from_timestamp(&ts_schema.name, time_range.as_ref())
}
pub fn build_filter_from_timestamp(
ts_col_name: &str,
time_range: Option<&TimestampRange>,
) -> Option<Expr> {
let time_range = time_range?;
let ts_col_expr = Expr::Column(Column {
relation: None,
name: ts_col_name.to_string(),
});
match (time_range.start(), time_range.end()) {
(None, None) => None,
(Some(start), None) => Some(binary_expr(
ts_col_expr,
Operator::GtEq,
timestamp_to_literal(start),
)),
(None, Some(end)) => Some(binary_expr(
ts_col_expr,
Operator::Lt,
timestamp_to_literal(end),
)),
(Some(start), Some(end)) => Some(and(
binary_expr(
ts_col_expr.clone(),
Operator::GtEq,
timestamp_to_literal(start),
),
binary_expr(ts_col_expr, Operator::Lt, timestamp_to_literal(end)),
)),
}
}
fn timestamp_to_literal(timestamp: &Timestamp) -> Expr {
let scalar_value = match timestamp.unit() {
TimeUnit::Second => ScalarValue::TimestampSecond(Some(timestamp.value()), None),
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(timestamp.value()), None),
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(timestamp.value()), None),
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(timestamp.value()), None),
};
Expr::Literal(scalar_value)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_timestamp_to_literal() {
let timestamp = Timestamp::new(123456789, TimeUnit::Second);
let expected = Expr::Literal(ScalarValue::TimestampSecond(Some(123456789), None));
assert_eq!(timestamp_to_literal(×tamp), expected);
let timestamp = Timestamp::new(123456789, TimeUnit::Millisecond);
let expected = Expr::Literal(ScalarValue::TimestampMillisecond(Some(123456789), None));
assert_eq!(timestamp_to_literal(×tamp), expected);
let timestamp = Timestamp::new(123456789, TimeUnit::Microsecond);
let expected = Expr::Literal(ScalarValue::TimestampMicrosecond(Some(123456789), None));
assert_eq!(timestamp_to_literal(×tamp), expected);
let timestamp = Timestamp::new(123456789, TimeUnit::Nanosecond);
let expected = Expr::Literal(ScalarValue::TimestampNanosecond(Some(123456789), None));
assert_eq!(timestamp_to_literal(×tamp), expected);
}
}