flow/expr/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module contains utility functions for expressions.
16
17use std::cmp::Ordering;
18use std::collections::BTreeMap;
19
20use datatypes::value::Value;
21use snafu::{ensure, OptionExt};
22
23use crate::error::UnexpectedSnafu;
24use crate::expr::ScalarExpr;
25use crate::plan::TypedPlan;
26use crate::Result;
27
28/// Find lower bound for time `current` in given `plan` for the time window expr.
29///
30/// i.e. for time window expr being `date_bin(INTERVAL '5 minutes', ts) as time_window` and `current="2021-07-01 00:01:01.000"`,
31/// return `Some("2021-07-01 00:00:00.000")`
32///
33/// if `plan` doesn't contain a `TIME INDEX` column, return `None`
34pub fn find_plan_time_window_expr_lower_bound(
35    plan: &TypedPlan,
36    current: common_time::Timestamp,
37) -> Result<Option<common_time::Timestamp>> {
38    let typ = plan.schema.typ();
39    let Some(mut time_index) = typ.time_index else {
40        return Ok(None);
41    };
42
43    let mut cur_plan = plan;
44    let mut expr_time_index;
45
46    loop {
47        // follow upward and find deepest time index expr that is not a column ref
48        expr_time_index = Some(cur_plan.plan.get_nth_expr(time_index).context(
49            UnexpectedSnafu {
50                reason: "Failed to find time index expr",
51            },
52        )?);
53
54        if let Some(ScalarExpr::Column(i)) = expr_time_index {
55            time_index = i;
56        } else {
57            break;
58        }
59        if let Some(input) = cur_plan.plan.get_first_input_plan() {
60            cur_plan = input;
61        } else {
62            break;
63        }
64    }
65
66    let expr_time_index = expr_time_index.context(UnexpectedSnafu {
67        reason: "Failed to find time index expr",
68    })?;
69
70    let ts_col = expr_time_index
71        .get_all_ref_columns()
72        .first()
73        .cloned()
74        .context(UnexpectedSnafu {
75            reason: "Failed to find time index column",
76        })?;
77
78    find_time_window_lower_bound(&expr_time_index, ts_col, current)
79}
80
81/// Find the lower bound of time window in given `expr` and `current` timestamp.
82///
83/// i.e. for `current="2021-07-01 00:01:01.000"` and `expr=date_bin(INTERVAL '5 minutes', ts) as time_window` and `ts_col=ts`,
84/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
85/// of current time window given the current timestamp
86///
87/// if return None, meaning this time window have no lower bound
88pub fn find_time_window_lower_bound(
89    expr: &ScalarExpr,
90    ts_col_idx: usize,
91    current: common_time::Timestamp,
92) -> Result<Option<common_time::Timestamp>> {
93    let all_ref_columns = expr.get_all_ref_columns();
94
95    ensure!(
96        all_ref_columns.contains(&ts_col_idx),
97        UnexpectedSnafu {
98            reason: format!(
99                "Expected column {} to be referenced in expression {expr:?}",
100                ts_col_idx
101            ),
102        }
103    );
104
105    ensure!(all_ref_columns.len() == 1, UnexpectedSnafu {
106        reason: format!(
107            "Expect only one column to be referenced in expression {expr:?}, found {all_ref_columns:?}"
108        ),
109    });
110
111    let permute_map = BTreeMap::from([(ts_col_idx, 0usize)]);
112
113    let mut rewrote_expr = expr.clone();
114
115    rewrote_expr.permute_map(&permute_map)?;
116
117    fn eval_to_timestamp(expr: &ScalarExpr, values: &[Value]) -> Result<common_time::Timestamp> {
118        let val = expr.eval(values)?;
119        if let Value::Timestamp(ts) = val {
120            Ok(ts)
121        } else {
122            UnexpectedSnafu {
123                reason: format!("Expected timestamp in expression {expr:?} but got {val:?}"),
124            }
125            .fail()?
126        }
127    }
128
129    let cur_time_window = eval_to_timestamp(&rewrote_expr, &[current.into()])?;
130
131    // search to find the lower bound
132    let mut offset: i64 = 1;
133    let lower_bound;
134    let mut upper_bound = Some(current);
135    // first expontial probe to found a range for binary search
136    loop {
137        let Some(next_val) = current.value().checked_sub(offset) else {
138            // no lower bound
139            return Ok(None);
140        };
141
142        let prev_time_probe = common_time::Timestamp::new(next_val, current.unit());
143
144        let prev_time_window = eval_to_timestamp(&rewrote_expr, &[prev_time_probe.into()])?;
145
146        match prev_time_window.cmp(&cur_time_window) {
147            Ordering::Less => {
148                lower_bound = Some(prev_time_probe);
149                break;
150            }
151            Ordering::Equal => {
152                upper_bound = Some(prev_time_probe);
153            }
154            Ordering::Greater => {
155                UnexpectedSnafu {
156                    reason: format!(
157                        "Unsupported time window expression {rewrote_expr:?}, expect monotonic increasing for time window expression {expr:?}"
158                    ),
159                }
160                .fail()?
161            }
162        }
163
164        let Some(new_offset) = offset.checked_mul(2) else {
165            // no lower bound
166            return Ok(None);
167        };
168        offset = new_offset;
169    }
170
171    // binary search for the lower bound
172
173    ensure!(lower_bound.map(|v|v.unit())==upper_bound.map(|v|v.unit()), UnexpectedSnafu{
174        reason: format!(" unit mismatch for time window expression {expr:?}, found {lower_bound:?} and {upper_bound:?}"),
175    });
176
177    let output_unit = lower_bound.expect("should have lower bound").unit();
178
179    let mut low = lower_bound.expect("should have lower bound").value();
180    let mut high = upper_bound.expect("should have upper bound").value();
181    while low < high {
182        let mid = (low + high) / 2;
183        let mid_probe = common_time::Timestamp::new(mid, output_unit);
184        let mid_time_window = eval_to_timestamp(&rewrote_expr, &[mid_probe.into()])?;
185
186        match mid_time_window.cmp(&cur_time_window) {
187            Ordering::Less => low = mid + 1,
188            Ordering::Equal => high = mid,
189            Ordering::Greater => UnexpectedSnafu {
190                reason: format!("Binary search failed for time window expression {expr:?}"),
191            }
192            .fail()?,
193        }
194    }
195
196    let final_lower_bound_for_time_window = common_time::Timestamp::new(low, output_unit);
197
198    Ok(Some(final_lower_bound_for_time_window))
199}
200
201#[cfg(test)]
202mod test {
203    use pretty_assertions::assert_eq;
204
205    use super::*;
206    use crate::plan::{Plan, TypedPlan};
207    use crate::test_utils::{create_test_ctx, create_test_query_engine, sql_to_substrait};
208
209    #[tokio::test]
210    async fn test_plan_time_window_lower_bound() {
211        let testcases = [
212            // no time index
213            (
214                "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;",
215                "2021-07-01 00:01:01.000",
216                None,
217            ),
218            // time index
219            (
220                "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
221                "2021-07-01 00:01:01.000",
222                Some("2021-07-01 00:00:00.000"),
223            ),
224            // time index with other fields
225            (
226                "SELECT sum(number) as sum_up, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
227                "2021-07-01 00:01:01.000",
228                Some("2021-07-01 00:00:00.000"),
229            ),
230            // time index with other pks
231            (
232                "SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number;",
233                "2021-07-01 00:01:01.000",
234                Some("2021-07-01 00:00:00.000"),
235            ),
236        ];
237        let engine = create_test_query_engine();
238
239        for (sql, current, expected) in &testcases {
240            let plan = sql_to_substrait(engine.clone(), sql).await;
241            let mut ctx = create_test_ctx();
242            let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan)
243                .await
244                .unwrap();
245
246            let current = common_time::Timestamp::from_str(current, None).unwrap();
247
248            let expected =
249                expected.map(|expected| common_time::Timestamp::from_str(expected, None).unwrap());
250
251            assert_eq!(
252                find_plan_time_window_expr_lower_bound(&flow_plan, current).unwrap(),
253                expected
254            );
255        }
256    }
257
258    #[tokio::test]
259    async fn test_timewindow_lower_bound() {
260        let testcases = [
261            (
262                ("'5 minutes'", "ts", Some("2021-07-01 00:00:00.000")),
263                "2021-07-01 00:01:01.000",
264                "2021-07-01 00:00:00.000",
265            ),
266            (
267                ("'5 minutes'", "ts", None),
268                "2021-07-01 00:01:01.000",
269                "2021-07-01 00:00:00.000",
270            ),
271            (
272                ("'5 minutes'", "ts", None),
273                "2021-07-01 00:00:00.000",
274                "2021-07-01 00:00:00.000",
275            ),
276            // test edge cases
277            (
278                ("'5 minutes'", "ts", None),
279                "2021-07-01 00:05:00.000",
280                "2021-07-01 00:05:00.000",
281            ),
282            (
283                ("'5 minutes'", "ts", None),
284                "2021-07-01 00:04:59.999",
285                "2021-07-01 00:00:00.000",
286            ),
287            (
288                ("'5 minutes'", "ts", None),
289                "2021-07-01 00:04:59.999999999",
290                "2021-07-01 00:00:00.000",
291            ),
292            (
293                ("'5 minutes'", "ts", None),
294                "2021-07-01 00:04:59.999999999999",
295                "2021-07-01 00:00:00.000",
296            ),
297            (
298                ("'5 minutes'", "ts", None),
299                "2021-07-01 00:04:59.999999999999999",
300                "2021-07-01 00:00:00.000",
301            ),
302        ];
303        let engine = create_test_query_engine();
304
305        for (args, current, expected) in testcases {
306            let sql = if let Some(origin) = args.2 {
307                format!(
308                    "SELECT date_bin({}, {}, '{origin}') FROM numbers_with_ts;",
309                    args.0, args.1
310                )
311            } else {
312                format!(
313                    "SELECT date_bin({}, {}) FROM numbers_with_ts;",
314                    args.0, args.1
315                )
316            };
317            let plan = sql_to_substrait(engine.clone(), &sql).await;
318            let mut ctx = create_test_ctx();
319            let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan)
320                .await
321                .unwrap();
322
323            let expr = {
324                let mfp = flow_plan.plan;
325                let Plan::Mfp { mfp, .. } = mfp else {
326                    unreachable!()
327                };
328                mfp.expressions[0].clone()
329            };
330
331            let current = common_time::Timestamp::from_str(current, None).unwrap();
332
333            let res = find_time_window_lower_bound(&expr, 1, current).unwrap();
334
335            let expected = Some(common_time::Timestamp::from_str(expected, None).unwrap());
336
337            assert_eq!(res, expected);
338        }
339    }
340}