1use std::cmp::Ordering;
18use std::collections::BTreeMap;
19
20use datatypes::value::Value;
21use snafu::{ensure, OptionExt};
22
23use crate::error::UnexpectedSnafu;
24use crate::expr::ScalarExpr;
25use crate::plan::TypedPlan;
26use crate::Result;
27
28pub fn find_plan_time_window_expr_lower_bound(
35 plan: &TypedPlan,
36 current: common_time::Timestamp,
37) -> Result<Option<common_time::Timestamp>> {
38 let typ = plan.schema.typ();
39 let Some(mut time_index) = typ.time_index else {
40 return Ok(None);
41 };
42
43 let mut cur_plan = plan;
44 let mut expr_time_index;
45
46 loop {
47 expr_time_index = Some(cur_plan.plan.get_nth_expr(time_index).context(
49 UnexpectedSnafu {
50 reason: "Failed to find time index expr",
51 },
52 )?);
53
54 if let Some(ScalarExpr::Column(i)) = expr_time_index {
55 time_index = i;
56 } else {
57 break;
58 }
59 if let Some(input) = cur_plan.plan.get_first_input_plan() {
60 cur_plan = input;
61 } else {
62 break;
63 }
64 }
65
66 let expr_time_index = expr_time_index.context(UnexpectedSnafu {
67 reason: "Failed to find time index expr",
68 })?;
69
70 let ts_col = expr_time_index
71 .get_all_ref_columns()
72 .first()
73 .cloned()
74 .context(UnexpectedSnafu {
75 reason: "Failed to find time index column",
76 })?;
77
78 find_time_window_lower_bound(&expr_time_index, ts_col, current)
79}
80
81pub fn find_time_window_lower_bound(
89 expr: &ScalarExpr,
90 ts_col_idx: usize,
91 current: common_time::Timestamp,
92) -> Result<Option<common_time::Timestamp>> {
93 let all_ref_columns = expr.get_all_ref_columns();
94
95 ensure!(
96 all_ref_columns.contains(&ts_col_idx),
97 UnexpectedSnafu {
98 reason: format!(
99 "Expected column {} to be referenced in expression {expr:?}",
100 ts_col_idx
101 ),
102 }
103 );
104
105 ensure!(all_ref_columns.len() == 1, UnexpectedSnafu {
106 reason: format!(
107 "Expect only one column to be referenced in expression {expr:?}, found {all_ref_columns:?}"
108 ),
109 });
110
111 let permute_map = BTreeMap::from([(ts_col_idx, 0usize)]);
112
113 let mut rewrote_expr = expr.clone();
114
115 rewrote_expr.permute_map(&permute_map)?;
116
117 fn eval_to_timestamp(expr: &ScalarExpr, values: &[Value]) -> Result<common_time::Timestamp> {
118 let val = expr.eval(values)?;
119 if let Value::Timestamp(ts) = val {
120 Ok(ts)
121 } else {
122 UnexpectedSnafu {
123 reason: format!("Expected timestamp in expression {expr:?} but got {val:?}"),
124 }
125 .fail()?
126 }
127 }
128
129 let cur_time_window = eval_to_timestamp(&rewrote_expr, &[current.into()])?;
130
131 let mut offset: i64 = 1;
133 let lower_bound;
134 let mut upper_bound = Some(current);
135 loop {
137 let Some(next_val) = current.value().checked_sub(offset) else {
138 return Ok(None);
140 };
141
142 let prev_time_probe = common_time::Timestamp::new(next_val, current.unit());
143
144 let prev_time_window = eval_to_timestamp(&rewrote_expr, &[prev_time_probe.into()])?;
145
146 match prev_time_window.cmp(&cur_time_window) {
147 Ordering::Less => {
148 lower_bound = Some(prev_time_probe);
149 break;
150 }
151 Ordering::Equal => {
152 upper_bound = Some(prev_time_probe);
153 }
154 Ordering::Greater => {
155 UnexpectedSnafu {
156 reason: format!(
157 "Unsupported time window expression {rewrote_expr:?}, expect monotonic increasing for time window expression {expr:?}"
158 ),
159 }
160 .fail()?
161 }
162 }
163
164 let Some(new_offset) = offset.checked_mul(2) else {
165 return Ok(None);
167 };
168 offset = new_offset;
169 }
170
171 ensure!(lower_bound.map(|v|v.unit())==upper_bound.map(|v|v.unit()), UnexpectedSnafu{
174 reason: format!(" unit mismatch for time window expression {expr:?}, found {lower_bound:?} and {upper_bound:?}"),
175 });
176
177 let output_unit = lower_bound.expect("should have lower bound").unit();
178
179 let mut low = lower_bound.expect("should have lower bound").value();
180 let mut high = upper_bound.expect("should have upper bound").value();
181 while low < high {
182 let mid = (low + high) / 2;
183 let mid_probe = common_time::Timestamp::new(mid, output_unit);
184 let mid_time_window = eval_to_timestamp(&rewrote_expr, &[mid_probe.into()])?;
185
186 match mid_time_window.cmp(&cur_time_window) {
187 Ordering::Less => low = mid + 1,
188 Ordering::Equal => high = mid,
189 Ordering::Greater => UnexpectedSnafu {
190 reason: format!("Binary search failed for time window expression {expr:?}"),
191 }
192 .fail()?,
193 }
194 }
195
196 let final_lower_bound_for_time_window = common_time::Timestamp::new(low, output_unit);
197
198 Ok(Some(final_lower_bound_for_time_window))
199}
200
201#[cfg(test)]
202mod test {
203 use pretty_assertions::assert_eq;
204
205 use super::*;
206 use crate::plan::{Plan, TypedPlan};
207 use crate::test_utils::{create_test_ctx, create_test_query_engine, sql_to_substrait};
208
209 #[tokio::test]
210 async fn test_plan_time_window_lower_bound() {
211 let testcases = [
212 (
214 "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;",
215 "2021-07-01 00:01:01.000",
216 None,
217 ),
218 (
220 "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
221 "2021-07-01 00:01:01.000",
222 Some("2021-07-01 00:00:00.000"),
223 ),
224 (
226 "SELECT sum(number) as sum_up, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
227 "2021-07-01 00:01:01.000",
228 Some("2021-07-01 00:00:00.000"),
229 ),
230 (
232 "SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number;",
233 "2021-07-01 00:01:01.000",
234 Some("2021-07-01 00:00:00.000"),
235 ),
236 ];
237 let engine = create_test_query_engine();
238
239 for (sql, current, expected) in &testcases {
240 let plan = sql_to_substrait(engine.clone(), sql).await;
241 let mut ctx = create_test_ctx();
242 let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan)
243 .await
244 .unwrap();
245
246 let current = common_time::Timestamp::from_str(current, None).unwrap();
247
248 let expected =
249 expected.map(|expected| common_time::Timestamp::from_str(expected, None).unwrap());
250
251 assert_eq!(
252 find_plan_time_window_expr_lower_bound(&flow_plan, current).unwrap(),
253 expected
254 );
255 }
256 }
257
258 #[tokio::test]
259 async fn test_timewindow_lower_bound() {
260 let testcases = [
261 (
262 ("'5 minutes'", "ts", Some("2021-07-01 00:00:00.000")),
263 "2021-07-01 00:01:01.000",
264 "2021-07-01 00:00:00.000",
265 ),
266 (
267 ("'5 minutes'", "ts", None),
268 "2021-07-01 00:01:01.000",
269 "2021-07-01 00:00:00.000",
270 ),
271 (
272 ("'5 minutes'", "ts", None),
273 "2021-07-01 00:00:00.000",
274 "2021-07-01 00:00:00.000",
275 ),
276 (
278 ("'5 minutes'", "ts", None),
279 "2021-07-01 00:05:00.000",
280 "2021-07-01 00:05:00.000",
281 ),
282 (
283 ("'5 minutes'", "ts", None),
284 "2021-07-01 00:04:59.999",
285 "2021-07-01 00:00:00.000",
286 ),
287 (
288 ("'5 minutes'", "ts", None),
289 "2021-07-01 00:04:59.999999999",
290 "2021-07-01 00:00:00.000",
291 ),
292 (
293 ("'5 minutes'", "ts", None),
294 "2021-07-01 00:04:59.999999999999",
295 "2021-07-01 00:00:00.000",
296 ),
297 (
298 ("'5 minutes'", "ts", None),
299 "2021-07-01 00:04:59.999999999999999",
300 "2021-07-01 00:00:00.000",
301 ),
302 ];
303 let engine = create_test_query_engine();
304
305 for (args, current, expected) in testcases {
306 let sql = if let Some(origin) = args.2 {
307 format!(
308 "SELECT date_bin({}, {}, '{origin}') FROM numbers_with_ts;",
309 args.0, args.1
310 )
311 } else {
312 format!(
313 "SELECT date_bin({}, {}) FROM numbers_with_ts;",
314 args.0, args.1
315 )
316 };
317 let plan = sql_to_substrait(engine.clone(), &sql).await;
318 let mut ctx = create_test_ctx();
319 let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan)
320 .await
321 .unwrap();
322
323 let expr = {
324 let mfp = flow_plan.plan;
325 let Plan::Mfp { mfp, .. } = mfp else {
326 unreachable!()
327 };
328 mfp.expressions[0].clone()
329 };
330
331 let current = common_time::Timestamp::from_str(current, None).unwrap();
332
333 let res = find_time_window_lower_bound(&expr, 1, current).unwrap();
334
335 let expected = Some(common_time::Timestamp::from_str(expected, None).unwrap());
336
337 assert_eq!(res, expected);
338 }
339 }
340}