Skip to main content

flow/batching_mode/
time_window.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Time window expr and helper functions
16//!
17
18use std::collections::BTreeSet;
19use std::sync::Arc;
20
21use api::helper::pb_value_to_value_ref;
22use arrow::array::{
23    TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
24    TimestampSecondArray,
25};
26use catalog::CatalogManagerRef;
27use common_error::ext::BoxedError;
28use common_recordbatch::DfRecordBatch;
29use common_telemetry::warn;
30use common_time::Timestamp;
31use common_time::timestamp::TimeUnit;
32use datafusion::error::Result as DfResult;
33use datafusion::execution::SessionState;
34use datafusion::logical_expr::Expr;
35use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
36use datafusion_common::tree_node::{
37    Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor,
38};
39use datafusion_common::{DFSchema, TableReference};
40use datafusion_expr::{ColumnarValue, LogicalPlan};
41use datafusion_physical_expr::PhysicalExprRef;
42use datatypes::prelude::{ConcreteDataType, DataType};
43use datatypes::schema::TIME_INDEX_KEY;
44use datatypes::value::Value;
45use datatypes::vectors::{
46    TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
47    TimestampSecondVector, Vector,
48};
49use itertools::Itertools;
50use session::context::QueryContextRef;
51use snafu::{OptionExt, ResultExt, ensure};
52
53use crate::Error;
54use crate::adapter::util::from_proto_to_data_type;
55use crate::error::{
56    ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, PlanSnafu, TimeSnafu,
57    UnexpectedSnafu,
58};
59use crate::expr::error::DataTypeSnafu;
60
61/// Represents a test timestamp in seconds since the Unix epoch.
62const DEFAULT_TEST_TIMESTAMP: Timestamp = Timestamp::new_second(17_0000_0000);
63
64#[derive(Default)]
65struct TimeWindowPlanShape {
66    table_scan_count: usize,
67    aggregate_count: usize,
68    has_unsupported_pruning_node: bool,
69}
70
71impl TimeWindowPlanShape {
72    fn should_skip_time_window_expr(&self) -> bool {
73        self.has_unsupported_pruning_node || self.table_scan_count != 1 || self.aggregate_count > 1
74    }
75
76    fn should_stop_inspection(&self) -> bool {
77        // This intentionally differs from the final skip predicate above:
78        // zero table scans make a fully inspected plan ineligible, but they are
79        // not an early-stop condition because a later subtree may still contain
80        // the first table scan.
81        self.has_unsupported_pruning_node || self.table_scan_count > 1 || self.aggregate_count > 1
82    }
83}
84
85impl TreeNodeVisitor<'_> for TimeWindowPlanShape {
86    type Node = LogicalPlan;
87
88    fn f_down(&mut self, node: &Self::Node) -> DfResult<TreeNodeRecursion> {
89        match node {
90            LogicalPlan::TableScan(_) => {
91                self.table_scan_count += 1;
92            }
93            LogicalPlan::Aggregate(_) => {
94                self.aggregate_count += 1;
95            }
96            // The pinned DataFusion fork has no separate
97            // `LogicalPlan::CrossJoin` variant. SQL `CROSS JOIN` is represented
98            // as `LogicalPlan::Join(_)` here, so rejecting all joins also
99            // rejects cross joins.
100            LogicalPlan::Join(_)
101            | LogicalPlan::Window(_)
102            | LogicalPlan::Union(_)
103            | LogicalPlan::Distinct(_)
104            | LogicalPlan::Limit(_)
105            | LogicalPlan::Sort(_)
106            | LogicalPlan::Extension(_)
107            | LogicalPlan::Dml(_)
108            | LogicalPlan::Ddl(_)
109            | LogicalPlan::Unnest(_)
110            | LogicalPlan::RecursiveQuery(_) => {
111                self.has_unsupported_pruning_node = true;
112            }
113            _ => {}
114        }
115
116        // These disqualifying conditions are monotonic. Once any of them is
117        // met, later traversal cannot make the plan eligible for TWE pruning
118        // again. Counts may be partial after `Stop`, but they are only used by
119        // the final skip predicate.
120        if self.should_stop_inspection() {
121            Ok(TreeNodeRecursion::Stop)
122        } else {
123            Ok(TreeNodeRecursion::Continue)
124        }
125    }
126}
127
128fn inspect_time_window_plan_shape(plan: &LogicalPlan) -> DfResult<TimeWindowPlanShape> {
129    let mut shape = TimeWindowPlanShape::default();
130    plan.visit_with_subqueries(&mut shape)?;
131    Ok(shape)
132}
133
134fn should_skip_time_window_expr(plan: &LogicalPlan) -> bool {
135    let Ok(shape) = inspect_time_window_plan_shape(plan) else {
136        return true;
137    };
138
139    shape.should_skip_time_window_expr()
140}
141
142/// Time window expr like `date_bin(INTERVAL '1' MINUTE, ts)`, this type help with
143/// evaluating the expr using given timestamp
144///
145/// The time window expr must satisfies following conditions:
146/// 1. The expr must be monotonic non-decreasing
147/// 2. The expr must only have one and only one input column with timestamp type, and the output column must be timestamp type
148/// 3. The expr must be deterministic
149///
150/// An example of time window expr is `date_bin(INTERVAL '1' MINUTE, ts)`
151#[derive(Debug, Clone)]
152pub struct TimeWindowExpr {
153    phy_expr: PhysicalExprRef,
154    pub column_name: String,
155    logical_expr: Expr,
156    df_schema: DFSchema,
157    eval_time_window_size: Option<std::time::Duration>,
158    eval_time_original: Option<Timestamp>,
159}
160
161impl std::fmt::Display for TimeWindowExpr {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        f.debug_struct("TimeWindowExpr")
164            .field("phy_expr", &self.phy_expr.to_string())
165            .field("column_name", &self.column_name)
166            .field("logical_expr", &self.logical_expr.to_string())
167            .field("df_schema", &self.df_schema)
168            .finish()
169    }
170}
171
172impl TimeWindowExpr {
173    /// The time window size of the expr, get from calling `eval` with a test timestamp
174    pub fn time_window_size(&self) -> &Option<std::time::Duration> {
175        &self.eval_time_window_size
176    }
177
178    pub fn from_expr(
179        expr: &Expr,
180        column_name: &str,
181        df_schema: &DFSchema,
182        session: &SessionState,
183    ) -> Result<Self, Error> {
184        let phy_expr: PhysicalExprRef = to_phy_expr(expr, df_schema, session)?;
185        let mut zelf = Self {
186            phy_expr,
187            column_name: column_name.to_string(),
188            logical_expr: expr.clone(),
189            df_schema: df_schema.clone(),
190            eval_time_window_size: None,
191            eval_time_original: None,
192        };
193        let test_ts = DEFAULT_TEST_TIMESTAMP;
194        let (lower, upper) = zelf.eval(test_ts)?;
195        let time_window_size = match (lower, upper) {
196            (Some(l), Some(u)) => u.sub(&l).map(|r| r.to_std()).transpose().map_err(|_| {
197                UnexpectedSnafu {
198                    reason: format!(
199                        "Expect upper bound older than lower bound, found upper={u:?} and lower={l:?}"
200                    ),
201                }
202                .build()
203            })?,
204            _ => None,
205        };
206        zelf.eval_time_window_size = time_window_size;
207        zelf.eval_time_original = lower;
208
209        Ok(zelf)
210    }
211
212    /// TODO(discord9): add `eval_batch` too
213    pub fn eval(
214        &self,
215        current: Timestamp,
216    ) -> Result<(Option<Timestamp>, Option<Timestamp>), Error> {
217        fn compute_distance(time_diff_ns: i64, stride_ns: i64) -> i64 {
218            if stride_ns == 0 {
219                return time_diff_ns;
220            }
221            // a - (a % n) impl ceil to nearest n * stride
222            let time_delta = time_diff_ns - (time_diff_ns % stride_ns);
223
224            if time_diff_ns < 0 && time_delta != time_diff_ns {
225                // The origin is later than the source timestamp, round down to the previous bin
226
227                time_delta - stride_ns
228            } else {
229                time_delta
230            }
231        }
232
233        // FAST PATH: if we have eval_time_original and eval_time_window_size,
234        // we can compute the bounds directly
235        if let (Some(original), Some(window_size)) =
236            (self.eval_time_original, self.eval_time_window_size)
237        {
238            // date_bin align current to lower bound
239            let time_diff_ns = current.sub(&original).and_then(|s|s.num_nanoseconds()).with_context(||UnexpectedSnafu {
240                reason: format!(
241                    "Failed to compute time difference between current {current:?} and original {original:?}"
242                ),
243            })?;
244
245            let window_size_ns = window_size.as_nanos() as i64;
246
247            let distance_ns = compute_distance(time_diff_ns, window_size_ns);
248
249            let lower_bound = if distance_ns >= 0 {
250                original.add_duration(std::time::Duration::from_nanos(distance_ns as u64))
251            } else {
252                original.sub_duration(std::time::Duration::from_nanos((-distance_ns) as u64))
253            }
254            .context(TimeSnafu)?;
255            let upper_bound = lower_bound.add_duration(window_size).context(TimeSnafu)?;
256
257            return Ok((Some(lower_bound), Some(upper_bound)));
258        }
259
260        let lower_bound =
261            calc_expr_time_window_lower_bound(&self.phy_expr, &self.df_schema, current)?;
262        let upper_bound =
263            probe_expr_time_window_upper_bound(&self.phy_expr, &self.df_schema, current)?;
264        Ok((lower_bound, upper_bound))
265    }
266
267    /// Find timestamps from rows using time window expr
268    ///
269    /// use column of name `self.column_name` from input rows list as input to time window expr
270    pub async fn handle_rows(
271        &self,
272        rows_list: Vec<api::v1::Rows>,
273    ) -> Result<BTreeSet<Timestamp>, Error> {
274        let mut time_windows = BTreeSet::new();
275
276        for rows in rows_list {
277            // pick the time index column and use it to eval on `self.expr`
278            // TODO(discord9): handle case where time index column is not present(i.e. DEFAULT constant value)
279            let ts_col_index = rows
280                .schema
281                .iter()
282                .map(|col| col.column_name.clone())
283                .position(|name| name == self.column_name);
284            let Some(ts_col_index) = ts_col_index else {
285                warn!("can't found time index column in schema: {:?}", rows.schema);
286                continue;
287            };
288            let col_schema = &rows.schema[ts_col_index];
289            let cdt = from_proto_to_data_type(col_schema)?;
290
291            let mut vector = cdt.create_mutable_vector(rows.rows.len());
292            for row in rows.rows {
293                let value = pb_value_to_value_ref(&row.values[ts_col_index], None);
294                vector.try_push_value_ref(&value).context(DataTypeSnafu {
295                    msg: "Failed to convert rows to columns",
296                })?;
297            }
298            let vector = vector.to_vector();
299
300            let df_schema = create_df_schema_for_ts_column(&self.column_name, cdt)?;
301
302            let rb =
303                DfRecordBatch::try_new(df_schema.inner().clone(), vec![vector.to_arrow_array()])
304                    .with_context(|_e| ArrowSnafu {
305                        context: format!(
306                            "Failed to create record batch from {df_schema:?} and {vector:?}"
307                        ),
308                    })?;
309
310            let eval_res = self
311                .phy_expr
312                .evaluate(&rb)
313                .with_context(|_| DatafusionSnafu {
314                    context: format!(
315                        "Failed to evaluate physical expression {:?} on {rb:?}",
316                        self.phy_expr
317                    ),
318                })?;
319
320            let res = columnar_to_ts_vector(&eval_res)?;
321
322            for ts in res.into_iter().flatten() {
323                time_windows.insert(ts);
324            }
325        }
326
327        Ok(time_windows)
328    }
329}
330
331fn create_df_schema_for_ts_column(name: &str, cdt: ConcreteDataType) -> Result<DFSchema, Error> {
332    let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
333        name,
334        cdt.as_arrow_type(),
335        false,
336    )]));
337
338    let df_schema = DFSchema::from_field_specific_qualified_schema(
339        vec![Some(TableReference::bare("TimeIndexOnlyTable"))],
340        &arrow_schema,
341    )
342    .with_context(|_e| DatafusionSnafu {
343        context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
344    })?;
345
346    Ok(df_schema)
347}
348
349/// Convert `ColumnarValue` to `Vec<Option<Timestamp>>`
350fn columnar_to_ts_vector(columnar: &ColumnarValue) -> Result<Vec<Option<Timestamp>>, Error> {
351    let val = match columnar {
352        datafusion_expr::ColumnarValue::Array(array) => {
353            let ty = array.data_type();
354            let ty = ConcreteDataType::from_arrow_type(ty);
355            let time_unit = if let ConcreteDataType::Timestamp(ty) = ty {
356                ty.unit()
357            } else {
358                return UnexpectedSnafu {
359                    reason: format!("Non-timestamp type: {ty:?}"),
360                }
361                .fail();
362            };
363
364            match time_unit {
365                TimeUnit::Second => array
366                    .as_ref()
367                    .as_any()
368                    .downcast_ref::<TimestampSecondArray>()
369                    .with_context(|| PlanSnafu {
370                        reason: format!("Failed to create vector from arrow array {array:?}"),
371                    })?
372                    .values()
373                    .iter()
374                    .map(|d| Some(Timestamp::new(*d, time_unit)))
375                    .collect_vec(),
376                TimeUnit::Millisecond => array
377                    .as_ref()
378                    .as_any()
379                    .downcast_ref::<TimestampMillisecondArray>()
380                    .with_context(|| PlanSnafu {
381                        reason: format!("Failed to create vector from arrow array {array:?}"),
382                    })?
383                    .values()
384                    .iter()
385                    .map(|d| Some(Timestamp::new(*d, time_unit)))
386                    .collect_vec(),
387                TimeUnit::Microsecond => array
388                    .as_ref()
389                    .as_any()
390                    .downcast_ref::<TimestampMicrosecondArray>()
391                    .with_context(|| PlanSnafu {
392                        reason: format!("Failed to create vector from arrow array {array:?}"),
393                    })?
394                    .values()
395                    .iter()
396                    .map(|d| Some(Timestamp::new(*d, time_unit)))
397                    .collect_vec(),
398                TimeUnit::Nanosecond => array
399                    .as_ref()
400                    .as_any()
401                    .downcast_ref::<TimestampNanosecondArray>()
402                    .with_context(|| PlanSnafu {
403                        reason: format!("Failed to create vector from arrow array {array:?}"),
404                    })?
405                    .values()
406                    .iter()
407                    .map(|d| Some(Timestamp::new(*d, time_unit)))
408                    .collect_vec(),
409            }
410        }
411        datafusion_expr::ColumnarValue::Scalar(scalar) => {
412            let value = Value::try_from(scalar.clone()).with_context(|_| DatatypesSnafu {
413                extra: format!("Failed to convert scalar {scalar:?} to value"),
414            })?;
415            let ts = value.as_timestamp().context(UnexpectedSnafu {
416                reason: format!("Expect Timestamp, found {:?}", value),
417            })?;
418            vec![Some(ts)]
419        }
420    };
421    Ok(val)
422}
423
424/// Return (`the column name of time index column`, `the time window expr`, `the expected time unit of time index column`, `the expr's schema for evaluating the time window`)
425///
426/// The time window expr is expected to have one input column with Timestamp type, and also return Timestamp type, the time window expr is expected
427/// to be monotonic increasing and appears in the innermost GROUP BY clause
428///
429/// note this plan should only contain one TableScan
430pub async fn find_time_window_expr(
431    plan: &LogicalPlan,
432    catalog_man: CatalogManagerRef,
433    query_ctx: QueryContextRef,
434) -> Result<(String, Option<datafusion_expr::Expr>, TimeUnit, DFSchema), Error> {
435    // TODO(discord9): find the expr that do time window
436
437    // Dirty-window pruning is only safe for simple single-source aggregate plans.
438    // For joins, window functions, set operations, distinct/sort/limit, extension
439    // nodes, or multi-scan plans, conservatively give up and let batching mode run
440    // the unfiltered full query instead.
441    if should_skip_time_window_expr(plan) {
442        // The column/schema/unit fields are placeholders when the TWE is None;
443        // callers must only use them when a TWE is present.
444        return Ok((
445            String::new(),
446            None,
447            TimeUnit::Millisecond,
448            DFSchema::empty(),
449        ));
450    }
451
452    let mut table_name = None;
453
454    // first find the table source in the logical plan
455    plan.apply(|plan| {
456        let LogicalPlan::TableScan(table_scan) = plan else {
457            return Ok(TreeNodeRecursion::Continue);
458        };
459        table_name = Some(table_scan.table_name.clone());
460        Ok(TreeNodeRecursion::Stop)
461    })
462    .with_context(|_| DatafusionSnafu {
463        context: format!("Can't find table source in plan {plan:?}"),
464    })?;
465    let Some(table_name) = table_name else {
466        UnexpectedSnafu {
467            reason: format!("Can't find table source in plan {plan:?}"),
468        }
469        .fail()?
470    };
471
472    let current_schema = query_ctx.current_schema();
473
474    let catalog_name = table_name.catalog().unwrap_or(query_ctx.current_catalog());
475    let schema_name = table_name.schema().unwrap_or(&current_schema);
476    let table_name = table_name.table();
477
478    let Some(table_ref) = catalog_man
479        .table(catalog_name, schema_name, table_name, Some(&query_ctx))
480        .await
481        .map_err(BoxedError::new)
482        .context(ExternalSnafu)?
483    else {
484        UnexpectedSnafu {
485            reason: format!(
486                "Can't find table {table_name:?} in catalog {catalog_name:?}/{schema_name:?}"
487            ),
488        }
489        .fail()?
490    };
491
492    let schema = &table_ref.table_info().meta.schema;
493
494    let ts_index = schema.timestamp_column().with_context(|| UnexpectedSnafu {
495        reason: format!("Can't find timestamp column in table {table_name:?}"),
496    })?;
497
498    let ts_col_name = ts_index.name.clone();
499
500    let expected_time_unit = ts_index.data_type.as_timestamp().with_context(|| UnexpectedSnafu {
501        reason: format!(
502            "Expected timestamp column {ts_col_name:?} in table {table_name:?} to be timestamp, but got {ts_index:?}"
503        ),
504    })?.unit();
505
506    let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
507        ts_col_name.clone(),
508        ts_index.data_type.as_arrow_type(),
509        false,
510    )]));
511
512    let df_schema = DFSchema::from_field_specific_qualified_schema(
513        vec![Some(TableReference::bare(table_name))],
514        &arrow_schema,
515    )
516    .with_context(|_e| DatafusionSnafu {
517        context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
518    })?;
519
520    // find the time window expr which refers to the time index column
521    let mut aggr_expr = None;
522    let mut time_window_expr: Option<Expr> = None;
523
524    let find_inner_aggr_expr = |plan: &LogicalPlan| {
525        if let LogicalPlan::Aggregate(aggregate) = plan {
526            aggr_expr = Some(aggregate.clone());
527        };
528
529        Ok(TreeNodeRecursion::Continue)
530    };
531    plan.apply(find_inner_aggr_expr)
532        .with_context(|_| DatafusionSnafu {
533            context: format!("Can't find aggr expr in plan {plan:?}"),
534        })?;
535
536    if let Some(aggregate) = aggr_expr {
537        for group_expr in &aggregate.group_expr {
538            let refs = group_expr.column_refs();
539            if refs.len() != 1 {
540                continue;
541            }
542            let ref_col = refs.iter().next().unwrap();
543
544            let index = aggregate.input.schema().maybe_index_of_column(ref_col);
545            let Some(index) = index else {
546                continue;
547            };
548            let field = aggregate.input.schema().field(index);
549
550            // TODO(discord9): need to ensure the field has the meta key for the time index
551            let is_time_index =
552                field.metadata().get(TIME_INDEX_KEY).map(|s| s.as_str()) == Some("true");
553
554            if is_time_index {
555                let rewrite_column = group_expr.clone();
556                let rewritten = rewrite_column
557                    .rewrite(&mut RewriteColumn {
558                        table_name: table_name.to_string(),
559                    })
560                    .with_context(|_| DatafusionSnafu {
561                        context: format!("Rewrite expr failed, expr={:?}", group_expr),
562                    })?
563                    .data;
564                struct RewriteColumn {
565                    table_name: String,
566                }
567
568                impl TreeNodeRewriter for RewriteColumn {
569                    type Node = Expr;
570                    fn f_down(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
571                        let Expr::Column(mut column) = node else {
572                            return Ok(Transformed::no(node));
573                        };
574
575                        column.relation = Some(TableReference::bare(self.table_name.clone()));
576
577                        Ok(Transformed::yes(Expr::Column(column)))
578                    }
579                }
580
581                time_window_expr = Some(rewritten);
582                break;
583            }
584        }
585        Ok((ts_col_name, time_window_expr, expected_time_unit, df_schema))
586    } else {
587        // can't found time window expr, return None
588        Ok((ts_col_name, None, expected_time_unit, df_schema))
589    }
590}
591
592/// Find nearest lower bound for time `current` in given `plan` for the time window expr.
593/// i.e. for time window expr being `date_bin(INTERVAL '5 minutes', ts) as time_window` and `current="2021-07-01 00:01:01.000"`,
594/// return `Some("2021-07-01 00:00:00.000")`
595/// if `plan` doesn't contain a `TIME INDEX` column, return `None`
596///
597/// Time window expr is a expr that:
598/// 1. ref only to a time index column
599/// 2. is monotonic increasing
600/// 3. show up in GROUP BY clause
601///
602/// note this plan should only contain one TableScan
603#[cfg(test)]
604pub async fn find_plan_time_window_bound(
605    plan: &LogicalPlan,
606    current: Timestamp,
607    query_ctx: QueryContextRef,
608    engine: query::QueryEngineRef,
609) -> Result<(String, Option<Timestamp>, Option<Timestamp>), Error> {
610    // TODO(discord9): find the expr that do time window
611    let catalog_man = engine.engine_state().catalog_manager();
612
613    let (ts_col_name, time_window_expr, expected_time_unit, df_schema) =
614        find_time_window_expr(plan, catalog_man.clone(), query_ctx).await?;
615    // cast current to ts_index's type
616    let new_current = current
617        .convert_to(expected_time_unit)
618        .with_context(|| UnexpectedSnafu {
619            reason: format!("Failed to cast current timestamp {current:?} to {expected_time_unit}"),
620        })?;
621
622    // if no time_window_expr is found, return None
623    if let Some(time_window_expr) = time_window_expr {
624        let phy_expr = to_phy_expr(
625            &time_window_expr,
626            &df_schema,
627            &engine.engine_state().session_state(),
628        )?;
629        let lower_bound = calc_expr_time_window_lower_bound(&phy_expr, &df_schema, new_current)?;
630        let upper_bound = probe_expr_time_window_upper_bound(&phy_expr, &df_schema, new_current)?;
631        Ok((ts_col_name, lower_bound, upper_bound))
632    } else {
633        Ok((ts_col_name, None, None))
634    }
635}
636
637/// Find the lower bound of time window in given `expr` and `current` timestamp.
638///
639/// i.e. for `current="2021-07-01 00:01:01.000"` and `expr=date_bin(INTERVAL '5 minutes', ts) as time_window` and `ts_col=ts`,
640/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
641/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
642/// of current time window given the current timestamp
643///
644/// if return None, meaning this time window have no lower bound
645fn calc_expr_time_window_lower_bound(
646    phy_expr: &PhysicalExprRef,
647    df_schema: &DFSchema,
648    current: Timestamp,
649) -> Result<Option<Timestamp>, Error> {
650    let cur_time_window = eval_phy_time_window_expr(phy_expr, df_schema, current)?;
651    let input_time_unit = cur_time_window.unit();
652    Ok(cur_time_window.convert_to(input_time_unit))
653}
654
655/// Probe for the upper bound for time window expression
656fn probe_expr_time_window_upper_bound(
657    phy_expr: &PhysicalExprRef,
658    df_schema: &DFSchema,
659    current: Timestamp,
660) -> Result<Option<Timestamp>, Error> {
661    // TODO(discord9): special handling `date_bin` for faster path
662    use std::cmp::Ordering;
663
664    let cur_time_window = eval_phy_time_window_expr(phy_expr, df_schema, current)?;
665
666    // search to find the lower bound
667    let mut offset: i64 = 1;
668    let mut lower_bound = Some(current);
669    let upper_bound;
670    // first expontial probe to found a range for binary search
671    loop {
672        let Some(next_val) = current.value().checked_add(offset) else {
673            // no upper bound if overflow, which is ok
674            return Ok(None);
675        };
676
677        let next_time_probe = common_time::Timestamp::new(next_val, current.unit());
678
679        let next_time_window = eval_phy_time_window_expr(phy_expr, df_schema, next_time_probe)?;
680
681        match next_time_window.cmp(&cur_time_window) {
682            Ordering::Less => UnexpectedSnafu {
683                    reason: format!(
684                        "Unsupported time window expression, expect monotonic increasing for time window expression {phy_expr:?}"
685                    ),
686                }
687                .fail()?,
688            Ordering::Equal => {
689                lower_bound = Some(next_time_probe);
690            }
691            Ordering::Greater => {
692                upper_bound = Some(next_time_probe);
693                break
694            }
695        }
696
697        let Some(new_offset) = offset.checked_mul(2) else {
698            // no upper bound if overflow
699            return Ok(None);
700        };
701        offset = new_offset;
702    }
703
704    // binary search for the exact upper bound
705
706    binary_search_expr(
707        lower_bound,
708        upper_bound,
709        cur_time_window,
710        phy_expr,
711        df_schema,
712    )
713    .map(Some)
714}
715
716fn binary_search_expr(
717    lower_bound: Option<Timestamp>,
718    upper_bound: Option<Timestamp>,
719    cur_time_window: Timestamp,
720    phy_expr: &PhysicalExprRef,
721    df_schema: &DFSchema,
722) -> Result<Timestamp, Error> {
723    ensure!(
724        lower_bound.map(|v| v.unit()) == upper_bound.map(|v| v.unit()),
725        UnexpectedSnafu {
726            reason: format!(
727                " unit mismatch for time window expression {phy_expr:?}, found {lower_bound:?} and {upper_bound:?}"
728            ),
729        }
730    );
731
732    let output_unit = upper_bound
733        .context(UnexpectedSnafu {
734            reason: "should have lower bound",
735        })?
736        .unit();
737
738    let mut low = lower_bound
739        .context(UnexpectedSnafu {
740            reason: "should have lower bound",
741        })?
742        .value();
743    let mut high = upper_bound
744        .context(UnexpectedSnafu {
745            reason: "should have upper bound",
746        })?
747        .value();
748    while low < high {
749        let mid = (low + high) / 2;
750        let mid_probe = common_time::Timestamp::new(mid, output_unit);
751        let mid_time_window = eval_phy_time_window_expr(phy_expr, df_schema, mid_probe)?;
752
753        match mid_time_window.cmp(&cur_time_window) {
754            std::cmp::Ordering::Less => UnexpectedSnafu {
755                reason: format!("Binary search failed for time window expression {phy_expr:?}"),
756            }
757            .fail()?,
758            std::cmp::Ordering::Equal => low = mid + 1,
759            std::cmp::Ordering::Greater => high = mid,
760        }
761    }
762
763    let final_upper_bound_for_time_window = common_time::Timestamp::new(high, output_unit);
764    Ok(final_upper_bound_for_time_window)
765}
766
767/// Expect the `phy` expression only have one input column with Timestamp type, and also return Timestamp type
768fn eval_phy_time_window_expr(
769    phy: &PhysicalExprRef,
770    df_schema: &DFSchema,
771    input_value: Timestamp,
772) -> Result<Timestamp, Error> {
773    let schema_ty = df_schema.field(0).data_type();
774    let schema_cdt = ConcreteDataType::from_arrow_type(schema_ty);
775    let schema_unit = if let ConcreteDataType::Timestamp(ts) = schema_cdt {
776        ts.unit()
777    } else {
778        return UnexpectedSnafu {
779            reason: format!("Expect Timestamp, found {:?}", schema_cdt),
780        }
781        .fail();
782    };
783    let input_value = input_value
784        .convert_to(schema_unit)
785        .with_context(|| UnexpectedSnafu {
786            reason: format!("Failed to convert timestamp {input_value:?} to {schema_unit}"),
787        })?;
788    let ts_vector = match schema_unit {
789        TimeUnit::Second => {
790            TimestampSecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
791        }
792        TimeUnit::Millisecond => {
793            TimestampMillisecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
794        }
795        TimeUnit::Microsecond => {
796            TimestampMicrosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
797        }
798        TimeUnit::Nanosecond => {
799            TimestampNanosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
800        }
801    };
802
803    let rb = DfRecordBatch::try_new(df_schema.inner().clone(), vec![ts_vector.clone()])
804        .with_context(|_| ArrowSnafu {
805            context: format!("Failed to create record batch from {df_schema:?} and {ts_vector:?}"),
806        })?;
807
808    let eval_res = phy.evaluate(&rb).with_context(|_| DatafusionSnafu {
809        context: format!("Failed to evaluate physical expression {phy:?} on {rb:?}"),
810    })?;
811
812    if let Some(Some(ts)) = columnar_to_ts_vector(&eval_res)?.first() {
813        Ok(*ts)
814    } else {
815        UnexpectedSnafu {
816            reason: format!(
817                "Expected timestamp in expression {phy:?} but got {:?}",
818                eval_res
819            ),
820        }
821        .fail()?
822    }
823}
824
825fn to_phy_expr(
826    expr: &Expr,
827    df_schema: &DFSchema,
828    session: &SessionState,
829) -> Result<PhysicalExprRef, Error> {
830    let phy_planner = DefaultPhysicalPlanner::default();
831
832    let phy_expr: PhysicalExprRef = phy_planner
833        .create_physical_expr(expr, df_schema, session)
834        .with_context(|_e| DatafusionSnafu {
835            context: format!(
836                "Failed to create physical expression from {expr:?} using {df_schema:?}"
837            ),
838        })?;
839    Ok(phy_expr)
840}
841
842#[cfg(test)]
843mod test {
844    use datafusion_common::tree_node::TreeNode;
845    use pretty_assertions::assert_eq;
846    use session::context::QueryContext;
847
848    use super::*;
849    use crate::batching_mode::utils::{AddFilterRewriter, df_plan_to_sql, sql_to_df_plan};
850    use crate::test_utils::create_test_query_engine;
851
852    #[tokio::test]
853    async fn test_plan_time_window_lower_bound() {
854        use datafusion_expr::{col, lit};
855        let query_engine = create_test_query_engine();
856        let ctx = QueryContext::arc();
857
858        let testcases = [
859            // same alias is not same column
860            (
861                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts GROUP BY ts;",
862                Timestamp::new(1740394109, TimeUnit::Second),
863                (
864                    "ts".to_string(),
865                    Some(Timestamp::new(1740394109000, TimeUnit::Millisecond)),
866                    Some(Timestamp::new(1740394109001, TimeUnit::Millisecond)),
867                ),
868                r#"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"#,
869            ),
870            // complex time window index
871            (
872                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts GROUP BY time_window;",
873                Timestamp::new(1740394109, TimeUnit::Second),
874                (
875                    "ts".to_string(),
876                    Some(Timestamp::new(1740394080, TimeUnit::Second)),
877                    Some(Timestamp::new(1740394140, TimeUnit::Second)),
878                ),
879                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')",
880            ),
881            // complex time window index with where
882            (
883                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE number in (2, 3, 4) GROUP BY time_window;",
884                Timestamp::new(1740394109, TimeUnit::Second),
885                (
886                    "ts".to_string(),
887                    Some(Timestamp::new(1740394080, TimeUnit::Second)),
888                    Some(Timestamp::new(1740394140, TimeUnit::Second)),
889                ),
890                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE numbers_with_ts.number IN (2, 3, 4) AND ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')",
891            ),
892            // complex time window index with between and
893            (
894                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE number BETWEEN 2 AND 4 GROUP BY time_window;",
895                Timestamp::new(1740394109, TimeUnit::Second),
896                (
897                    "ts".to_string(),
898                    Some(Timestamp::new(1740394080, TimeUnit::Second)),
899                    Some(Timestamp::new(1740394140, TimeUnit::Second)),
900                ),
901                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE (numbers_with_ts.number BETWEEN 2 AND 4) AND ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')",
902            ),
903            // no time index
904            (
905                "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;",
906                Timestamp::new(23, TimeUnit::Millisecond),
907                ("ts".to_string(), None, None),
908                "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;",
909            ),
910            // time index
911            (
912                "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
913                Timestamp::new(23, TimeUnit::Nanosecond),
914                (
915                    "ts".to_string(),
916                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
917                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
918                ),
919                "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)",
920            ),
921            // on spot
922            (
923                "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
924                Timestamp::new(0, TimeUnit::Nanosecond),
925                (
926                    "ts".to_string(),
927                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
928                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
929                ),
930                "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)",
931            ),
932            // different time unit
933            (
934                "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
935                Timestamp::new(23_000_000, TimeUnit::Nanosecond),
936                (
937                    "ts".to_string(),
938                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
939                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
940                ),
941                "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)",
942            ),
943            // time index with other fields
944            (
945                "SELECT sum(number) as sum_up, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
946                Timestamp::new(23, TimeUnit::Millisecond),
947                (
948                    "ts".to_string(),
949                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
950                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
951                ),
952                "SELECT sum(numbers_with_ts.number) AS sum_up, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)",
953            ),
954            // time index with other pks
955            (
956                "SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number;",
957                Timestamp::new(23, TimeUnit::Millisecond),
958                (
959                    "ts".to_string(),
960                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
961                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
962                ),
963                "SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number",
964            ),
965            // subquery
966            (
967                "SELECT number, time_window FROM (SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number);",
968                Timestamp::new(23, TimeUnit::Millisecond),
969                (
970                    "ts".to_string(),
971                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
972                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
973                ),
974                "SELECT numbers_with_ts.number, time_window FROM (SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number)",
975            ),
976            // cte
977            (
978                "with cte as (select number, date_bin('5 minutes', ts) as time_window from numbers_with_ts GROUP BY time_window, number) select number, time_window from cte;",
979                Timestamp::new(23, TimeUnit::Millisecond),
980                (
981                    "ts".to_string(),
982                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
983                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
984                ),
985                "SELECT cte.number, cte.time_window FROM (SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number) AS cte",
986            ),
987            // complex subquery without alias
988            (
989                "SELECT sum(number), number, date_bin('5 minutes', ts) as time_window, bucket_name FROM (SELECT number, ts, case when number < 5 THEN 'bucket_0_5' when number >= 5 THEN 'bucket_5_inf' END as bucket_name FROM numbers_with_ts) GROUP BY number, time_window, bucket_name;",
990                Timestamp::new(23, TimeUnit::Millisecond),
991                (
992                    "ts".to_string(),
993                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
994                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
995                ),
996                "SELECT sum(numbers_with_ts.number), numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window, bucket_name FROM (SELECT numbers_with_ts.number, numbers_with_ts.ts, CASE WHEN (numbers_with_ts.number < 5) THEN 'bucket_0_5' WHEN (numbers_with_ts.number >= 5) THEN 'bucket_5_inf' END AS bucket_name FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP)))) GROUP BY numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts), bucket_name",
997            ),
998            // complex subquery alias
999            (
1000                "SELECT sum(number), number, date_bin('5 minutes', ts) as time_window, bucket_name FROM (SELECT number, ts, case when number < 5 THEN 'bucket_0_5' when number >= 5 THEN 'bucket_5_inf' END as bucket_name FROM numbers_with_ts) as cte GROUP BY number, time_window, bucket_name;",
1001                Timestamp::new(23, TimeUnit::Millisecond),
1002                (
1003                    "ts".to_string(),
1004                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
1005                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
1006                ),
1007                "SELECT sum(cte.number), cte.number, date_bin('5 minutes', cte.ts) AS time_window, cte.bucket_name FROM (SELECT numbers_with_ts.number, numbers_with_ts.ts, CASE WHEN (numbers_with_ts.number < 5) THEN 'bucket_0_5' WHEN (numbers_with_ts.number >= 5) THEN 'bucket_5_inf' END AS bucket_name FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP)))) AS cte GROUP BY cte.number, date_bin('5 minutes', cte.ts), cte.bucket_name",
1008            ),
1009        ];
1010
1011        for (sql, current, expected, expected_unparsed) in testcases {
1012            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, true)
1013                .await
1014                .unwrap();
1015
1016            let real =
1017                find_plan_time_window_bound(&plan, current, ctx.clone(), query_engine.clone())
1018                    .await
1019                    .unwrap();
1020            assert_eq!(expected, real);
1021
1022            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false)
1023                .await
1024                .unwrap();
1025            let (col_name, lower, upper) = real;
1026            let new_sql = if let Some(lower) = lower {
1027                let to_df_literal = |value| {
1028                    let value = Value::from(value);
1029
1030                    value.try_to_scalar_value(&value.data_type()).unwrap()
1031                };
1032                let lower = to_df_literal(lower);
1033                let upper = to_df_literal(upper.unwrap());
1034                let expr = col(&col_name)
1035                    .gt_eq(lit(lower))
1036                    .and(col(&col_name).lt_eq(lit(upper)));
1037                let mut add_filter = AddFilterRewriter::new(expr);
1038                let plan = plan.rewrite(&mut add_filter).unwrap().data;
1039                df_plan_to_sql(&plan).unwrap()
1040            } else {
1041                sql.to_string()
1042            };
1043            assert_eq!(expected_unparsed, new_sql);
1044        }
1045    }
1046
1047    #[tokio::test]
1048    async fn test_complex_plans_skip_time_window_expr() {
1049        let query_engine = create_test_query_engine();
1050        let ctx = QueryContext::arc();
1051
1052        let testcases = [
1053            // A join may duplicate or drop rows across sources, so a time window
1054            // found on one side is not safe to use as a full-query dirty-window
1055            // pruning boundary.
1056            r#"
1057SELECT
1058    l.number,
1059    date_bin('5 minutes', l.ts) AS time_window
1060FROM numbers_with_ts l
1061JOIN numbers_with_ts r ON l.number = r.number
1062GROUP BY l.number, time_window
1063"#,
1064            // Window functions can depend on rows outside the dirty window,
1065            // even if their input contains a group-by time window.
1066            r#"
1067SELECT number, time_window
1068FROM (
1069    SELECT
1070        number,
1071        time_window,
1072        row_number() OVER (PARTITION BY number ORDER BY time_window DESC) AS rn
1073    FROM (
1074        SELECT number, date_bin('5 minutes', ts) AS time_window
1075        FROM numbers_with_ts
1076        GROUP BY number, time_window
1077    )
1078)
1079WHERE rn = 1
1080"#,
1081            // Set operations combine multiple query scopes/sources.
1082            r#"
1083SELECT date_bin('5 minutes', ts) AS time_window
1084FROM numbers_with_ts
1085GROUP BY time_window
1086UNION ALL
1087SELECT date_bin('5 minutes', ts) AS time_window
1088FROM numbers_with_ts
1089GROUP BY time_window
1090"#,
1091            // Nested aggregates are unsafe: pruning source rows by the inner
1092            // time window is not equivalent for the outer/global aggregate.
1093            r#"
1094SELECT max(cnt)
1095FROM (
1096    SELECT date_bin('5 minutes', ts) AS time_window, count(number) AS cnt
1097    FROM numbers_with_ts
1098    GROUP BY time_window
1099)
1100"#,
1101            // Expression subqueries add another query scope/source and should
1102            // not be treated as a simple single-source TWE plan.
1103            r#"
1104SELECT date_bin('5 minutes', ts) AS time_window
1105FROM numbers_with_ts
1106WHERE number IN (SELECT number FROM numbers_with_ts)
1107GROUP BY time_window
1108"#,
1109            // Sorting an otherwise valid TWE query is conservatively treated
1110            // as full-query to avoid adding dirty-window predicates across
1111            // post-aggregate plan nodes.
1112            r#"
1113SELECT date_bin('5 minutes', ts) AS time_window
1114FROM numbers_with_ts
1115GROUP BY time_window
1116ORDER BY time_window
1117"#,
1118            // DISTINCT is a post-query de-duplication boundary; keep it on the
1119            // full-query path even if its input has a time window group key.
1120            r#"
1121SELECT DISTINCT time_window
1122FROM (
1123    SELECT date_bin('5 minutes', ts) AS time_window
1124    FROM numbers_with_ts
1125    GROUP BY time_window
1126)
1127"#,
1128            // LIMIT can change which rows are visible after pruning.
1129            r#"
1130SELECT date_bin('5 minutes', ts) AS time_window
1131FROM numbers_with_ts
1132GROUP BY time_window
1133LIMIT 10
1134"#,
1135            // Cross joins may appear either as a dedicated node or as multiple
1136            // table scans; either way they must not use source dirty-window
1137            // pruning from one side only.
1138            r#"
1139SELECT date_bin('5 minutes', l.ts) AS time_window
1140FROM numbers_with_ts l, numbers_with_ts r
1141GROUP BY time_window
1142"#,
1143            // A cross join with a constant relation still has only one table
1144            // scan, so it must be rejected by the join node instead of relying
1145            // on the multi-scan guard.
1146            r#"
1147SELECT date_bin('5 minutes', l.ts) AS time_window
1148FROM numbers_with_ts l CROSS JOIN (VALUES (1)) AS v(x)
1149GROUP BY time_window
1150"#,
1151        ];
1152
1153        for sql in testcases {
1154            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, true)
1155                .await
1156                .unwrap();
1157            let (_, lower, upper) = find_plan_time_window_bound(
1158                &plan,
1159                Timestamp::new(23, TimeUnit::Millisecond),
1160                ctx.clone(),
1161                query_engine.clone(),
1162            )
1163            .await
1164            .unwrap();
1165
1166            assert_eq!(None, lower, "query should not have TWE: {sql}");
1167            assert_eq!(None, upper, "query should not have TWE: {sql}");
1168        }
1169    }
1170
1171    #[tokio::test]
1172    async fn test_simple_single_source_aggregate_keeps_time_window_expr() {
1173        let query_engine = create_test_query_engine();
1174        let ctx = QueryContext::arc();
1175
1176        let sql = r#"
1177SELECT max(number) AS max_number, date_bin('5 minutes', ts) AS time_window
1178FROM numbers_with_ts
1179GROUP BY time_window
1180"#;
1181        let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, true)
1182            .await
1183            .unwrap();
1184        let (_, lower, upper) = find_plan_time_window_bound(
1185            &plan,
1186            Timestamp::new(23, TimeUnit::Millisecond),
1187            ctx.clone(),
1188            query_engine.clone(),
1189        )
1190        .await
1191        .unwrap();
1192
1193        assert_eq!(Some(Timestamp::new(0, TimeUnit::Millisecond)), lower);
1194        assert_eq!(Some(Timestamp::new(300000, TimeUnit::Millisecond)), upper);
1195    }
1196}