common_query/logical_plan/
expr.rs1use common_time::range::TimestampRange;
16use common_time::timestamp::TimeUnit;
17use common_time::Timestamp;
18use datafusion_common::{Column, ScalarValue};
19use datafusion_expr::expr::Expr;
20use datafusion_expr::{and, binary_expr, Operator};
21use datatypes::data_type::DataType;
22use datatypes::schema::ColumnSchema;
23use datatypes::value::Value;
24
25pub fn build_same_type_ts_filter(
28 ts_schema: &ColumnSchema,
29 time_range: Option<TimestampRange>,
30) -> Option<Expr> {
31 let time_range = time_range?;
32 let start = time_range
33 .start()
34 .and_then(|start| ts_schema.data_type.try_cast(Value::Timestamp(start)));
35 let end = time_range
36 .end()
37 .and_then(|end| ts_schema.data_type.try_cast(Value::Timestamp(end)));
38
39 let time_range = match (start, end) {
40 (Some(Value::Timestamp(start)), Some(Value::Timestamp(end))) => {
41 TimestampRange::new(start, end)
42 }
43 (Some(Value::Timestamp(start)), None) => Some(TimestampRange::from_start(start)),
44 (None, Some(Value::Timestamp(end))) => Some(TimestampRange::until_end(end, false)),
45 _ => return None,
46 };
47 build_filter_from_timestamp(&ts_schema.name, time_range.as_ref())
48}
49
50pub fn build_filter_from_timestamp(
53 ts_col_name: &str,
54 time_range: Option<&TimestampRange>,
55) -> Option<Expr> {
56 let time_range = time_range?;
57 let ts_col_expr = Expr::Column(Column::from_name(ts_col_name));
58
59 match (time_range.start(), time_range.end()) {
60 (None, None) => None,
61 (Some(start), None) => Some(binary_expr(
62 ts_col_expr,
63 Operator::GtEq,
64 timestamp_to_literal(start),
65 )),
66 (None, Some(end)) => Some(binary_expr(
67 ts_col_expr,
68 Operator::Lt,
69 timestamp_to_literal(end),
70 )),
71 (Some(start), Some(end)) => Some(and(
72 binary_expr(
73 ts_col_expr.clone(),
74 Operator::GtEq,
75 timestamp_to_literal(start),
76 ),
77 binary_expr(ts_col_expr, Operator::Lt, timestamp_to_literal(end)),
78 )),
79 }
80}
81
82fn timestamp_to_literal(timestamp: &Timestamp) -> Expr {
84 let scalar_value = match timestamp.unit() {
85 TimeUnit::Second => ScalarValue::TimestampSecond(Some(timestamp.value()), None),
86 TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(timestamp.value()), None),
87 TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(timestamp.value()), None),
88 TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(timestamp.value()), None),
89 };
90 Expr::Literal(scalar_value)
91}
92
93#[cfg(test)]
94mod tests {
95 use super::*;
96
97 #[test]
98 fn test_timestamp_to_literal() {
99 let timestamp = Timestamp::new(123456789, TimeUnit::Second);
100 let expected = Expr::Literal(ScalarValue::TimestampSecond(Some(123456789), None));
101 assert_eq!(timestamp_to_literal(×tamp), expected);
102
103 let timestamp = Timestamp::new(123456789, TimeUnit::Millisecond);
104 let expected = Expr::Literal(ScalarValue::TimestampMillisecond(Some(123456789), None));
105 assert_eq!(timestamp_to_literal(×tamp), expected);
106
107 let timestamp = Timestamp::new(123456789, TimeUnit::Microsecond);
108 let expected = Expr::Literal(ScalarValue::TimestampMicrosecond(Some(123456789), None));
109 assert_eq!(timestamp_to_literal(×tamp), expected);
110
111 let timestamp = Timestamp::new(123456789, TimeUnit::Nanosecond);
112 let expected = Expr::Literal(ScalarValue::TimestampNanosecond(Some(123456789), None));
113 assert_eq!(timestamp_to_literal(×tamp), expected);
114 }
115}