flow/batching_mode/
time_window.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Time window expr and helper functions
16//!
17
18use std::collections::BTreeSet;
19use std::sync::Arc;
20
21use api::helper::pb_value_to_value_ref;
22use arrow::array::{
23    TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
24    TimestampSecondArray,
25};
26use catalog::CatalogManagerRef;
27use common_error::ext::BoxedError;
28use common_recordbatch::DfRecordBatch;
29use common_telemetry::warn;
30use common_time::timestamp::TimeUnit;
31use common_time::Timestamp;
32use datafusion::error::Result as DfResult;
33use datafusion::execution::SessionState;
34use datafusion::logical_expr::Expr;
35use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
36use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
37use datafusion_common::{DFSchema, TableReference};
38use datafusion_expr::{ColumnarValue, LogicalPlan};
39use datafusion_physical_expr::PhysicalExprRef;
40use datatypes::prelude::{ConcreteDataType, DataType};
41use datatypes::schema::TIME_INDEX_KEY;
42use datatypes::value::Value;
43use datatypes::vectors::{
44    TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
45    TimestampSecondVector, Vector,
46};
47use itertools::Itertools;
48use session::context::QueryContextRef;
49use snafu::{ensure, OptionExt, ResultExt};
50
51use crate::adapter::util::from_proto_to_data_type;
52use crate::error::{
53    ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, PlanSnafu, UnexpectedSnafu,
54};
55use crate::expr::error::DataTypeSnafu;
56use crate::Error;
57
58/// Time window expr like `date_bin(INTERVAL '1' MINUTE, ts)`, this type help with
59/// evaluating the expr using given timestamp
60///
61/// The time window expr must satisfies following conditions:
62/// 1. The expr must be monotonic non-decreasing
63/// 2. The expr must only have one and only one input column with timestamp type, and the output column must be timestamp type
64/// 3. The expr must be deterministic
65///
66/// An example of time window expr is `date_bin(INTERVAL '1' MINUTE, ts)`
67#[derive(Debug, Clone)]
68pub struct TimeWindowExpr {
69    phy_expr: PhysicalExprRef,
70    pub column_name: String,
71    logical_expr: Expr,
72    df_schema: DFSchema,
73}
74
75impl std::fmt::Display for TimeWindowExpr {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct("TimeWindowExpr")
78            .field("phy_expr", &self.phy_expr.to_string())
79            .field("column_name", &self.column_name)
80            .field("logical_expr", &self.logical_expr.to_string())
81            .field("df_schema", &self.df_schema)
82            .finish()
83    }
84}
85
86impl TimeWindowExpr {
87    pub fn from_expr(
88        expr: &Expr,
89        column_name: &str,
90        df_schema: &DFSchema,
91        session: &SessionState,
92    ) -> Result<Self, Error> {
93        let phy_expr: PhysicalExprRef = to_phy_expr(expr, df_schema, session)?;
94        Ok(Self {
95            phy_expr,
96            column_name: column_name.to_string(),
97            logical_expr: expr.clone(),
98            df_schema: df_schema.clone(),
99        })
100    }
101
102    pub fn eval(
103        &self,
104        current: Timestamp,
105    ) -> Result<(Option<Timestamp>, Option<Timestamp>), Error> {
106        let lower_bound =
107            calc_expr_time_window_lower_bound(&self.phy_expr, &self.df_schema, current)?;
108        let upper_bound =
109            probe_expr_time_window_upper_bound(&self.phy_expr, &self.df_schema, current)?;
110        Ok((lower_bound, upper_bound))
111    }
112
113    /// Find timestamps from rows using time window expr
114    ///
115    /// use column of name `self.column_name` from input rows list as input to time window expr
116    pub async fn handle_rows(
117        &self,
118        rows_list: Vec<api::v1::Rows>,
119    ) -> Result<BTreeSet<Timestamp>, Error> {
120        let mut time_windows = BTreeSet::new();
121
122        for rows in rows_list {
123            // pick the time index column and use it to eval on `self.expr`
124            // TODO(discord9): handle case where time index column is not present(i.e. DEFAULT constant value)
125            let ts_col_index = rows
126                .schema
127                .iter()
128                .map(|col| col.column_name.clone())
129                .position(|name| name == self.column_name);
130            let Some(ts_col_index) = ts_col_index else {
131                warn!("can't found time index column in schema: {:?}", rows.schema);
132                continue;
133            };
134            let col_schema = &rows.schema[ts_col_index];
135            let cdt = from_proto_to_data_type(col_schema)?;
136
137            let mut vector = cdt.create_mutable_vector(rows.rows.len());
138            for row in rows.rows {
139                let value = pb_value_to_value_ref(&row.values[ts_col_index], &None);
140                vector.try_push_value_ref(value).context(DataTypeSnafu {
141                    msg: "Failed to convert rows to columns",
142                })?;
143            }
144            let vector = vector.to_vector();
145
146            let df_schema = create_df_schema_for_ts_column(&self.column_name, cdt)?;
147
148            let rb =
149                DfRecordBatch::try_new(df_schema.inner().clone(), vec![vector.to_arrow_array()])
150                    .with_context(|_e| ArrowSnafu {
151                        context: format!(
152                            "Failed to create record batch from {df_schema:?} and {vector:?}"
153                        ),
154                    })?;
155
156            let eval_res = self
157                .phy_expr
158                .evaluate(&rb)
159                .with_context(|_| DatafusionSnafu {
160                    context: format!(
161                        "Failed to evaluate physical expression {:?} on {rb:?}",
162                        self.phy_expr
163                    ),
164                })?;
165
166            let res = columnar_to_ts_vector(&eval_res)?;
167
168            for ts in res.into_iter().flatten() {
169                time_windows.insert(ts);
170            }
171        }
172
173        Ok(time_windows)
174    }
175}
176
177fn create_df_schema_for_ts_column(name: &str, cdt: ConcreteDataType) -> Result<DFSchema, Error> {
178    let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
179        name,
180        cdt.as_arrow_type(),
181        false,
182    )]));
183
184    let df_schema = DFSchema::from_field_specific_qualified_schema(
185        vec![Some(TableReference::bare("TimeIndexOnlyTable"))],
186        &arrow_schema,
187    )
188    .with_context(|_e| DatafusionSnafu {
189        context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
190    })?;
191
192    Ok(df_schema)
193}
194
195/// Convert `ColumnarValue` to `Vec<Option<Timestamp>>`
196fn columnar_to_ts_vector(columnar: &ColumnarValue) -> Result<Vec<Option<Timestamp>>, Error> {
197    let val = match columnar {
198        datafusion_expr::ColumnarValue::Array(array) => {
199            let ty = array.data_type();
200            let ty = ConcreteDataType::from_arrow_type(ty);
201            let time_unit = if let ConcreteDataType::Timestamp(ty) = ty {
202                ty.unit()
203            } else {
204                return UnexpectedSnafu {
205                    reason: format!("Non-timestamp type: {ty:?}"),
206                }
207                .fail();
208            };
209
210            match time_unit {
211                TimeUnit::Second => array
212                    .as_ref()
213                    .as_any()
214                    .downcast_ref::<TimestampSecondArray>()
215                    .with_context(|| PlanSnafu {
216                        reason: format!("Failed to create vector from arrow array {array:?}"),
217                    })?
218                    .values()
219                    .iter()
220                    .map(|d| Some(Timestamp::new(*d, time_unit)))
221                    .collect_vec(),
222                TimeUnit::Millisecond => array
223                    .as_ref()
224                    .as_any()
225                    .downcast_ref::<TimestampMillisecondArray>()
226                    .with_context(|| PlanSnafu {
227                        reason: format!("Failed to create vector from arrow array {array:?}"),
228                    })?
229                    .values()
230                    .iter()
231                    .map(|d| Some(Timestamp::new(*d, time_unit)))
232                    .collect_vec(),
233                TimeUnit::Microsecond => array
234                    .as_ref()
235                    .as_any()
236                    .downcast_ref::<TimestampMicrosecondArray>()
237                    .with_context(|| PlanSnafu {
238                        reason: format!("Failed to create vector from arrow array {array:?}"),
239                    })?
240                    .values()
241                    .iter()
242                    .map(|d| Some(Timestamp::new(*d, time_unit)))
243                    .collect_vec(),
244                TimeUnit::Nanosecond => array
245                    .as_ref()
246                    .as_any()
247                    .downcast_ref::<TimestampNanosecondArray>()
248                    .with_context(|| PlanSnafu {
249                        reason: format!("Failed to create vector from arrow array {array:?}"),
250                    })?
251                    .values()
252                    .iter()
253                    .map(|d| Some(Timestamp::new(*d, time_unit)))
254                    .collect_vec(),
255            }
256        }
257        datafusion_expr::ColumnarValue::Scalar(scalar) => {
258            let value = Value::try_from(scalar.clone()).with_context(|_| DatatypesSnafu {
259                extra: format!("Failed to convert scalar {scalar:?} to value"),
260            })?;
261            let ts = value.as_timestamp().context(UnexpectedSnafu {
262                reason: format!("Expect Timestamp, found {:?}", value),
263            })?;
264            vec![Some(ts)]
265        }
266    };
267    Ok(val)
268}
269
270/// Return (`the column name of time index column`, `the time window expr`, `the expected time unit of time index column`, `the expr's schema for evaluating the time window`)
271///
272/// The time window expr is expected to have one input column with Timestamp type, and also return Timestamp type, the time window expr is expected
273/// to be monotonic increasing and appears in the innermost GROUP BY clause
274///
275/// note this plan should only contain one TableScan
276pub async fn find_time_window_expr(
277    plan: &LogicalPlan,
278    catalog_man: CatalogManagerRef,
279    query_ctx: QueryContextRef,
280) -> Result<(String, Option<datafusion_expr::Expr>, TimeUnit, DFSchema), Error> {
281    // TODO(discord9): find the expr that do time window
282
283    let mut table_name = None;
284
285    // first find the table source in the logical plan
286    plan.apply(|plan| {
287        let LogicalPlan::TableScan(table_scan) = plan else {
288            return Ok(TreeNodeRecursion::Continue);
289        };
290        table_name = Some(table_scan.table_name.clone());
291        Ok(TreeNodeRecursion::Stop)
292    })
293    .with_context(|_| DatafusionSnafu {
294        context: format!("Can't find table source in plan {plan:?}"),
295    })?;
296    let Some(table_name) = table_name else {
297        UnexpectedSnafu {
298            reason: format!("Can't find table source in plan {plan:?}"),
299        }
300        .fail()?
301    };
302
303    let current_schema = query_ctx.current_schema();
304
305    let catalog_name = table_name.catalog().unwrap_or(query_ctx.current_catalog());
306    let schema_name = table_name.schema().unwrap_or(&current_schema);
307    let table_name = table_name.table();
308
309    let Some(table_ref) = catalog_man
310        .table(catalog_name, schema_name, table_name, Some(&query_ctx))
311        .await
312        .map_err(BoxedError::new)
313        .context(ExternalSnafu)?
314    else {
315        UnexpectedSnafu {
316            reason: format!(
317                "Can't find table {table_name:?} in catalog {catalog_name:?}/{schema_name:?}"
318            ),
319        }
320        .fail()?
321    };
322
323    let schema = &table_ref.table_info().meta.schema;
324
325    let ts_index = schema.timestamp_column().with_context(|| UnexpectedSnafu {
326        reason: format!("Can't find timestamp column in table {table_name:?}"),
327    })?;
328
329    let ts_col_name = ts_index.name.clone();
330
331    let expected_time_unit = ts_index.data_type.as_timestamp().with_context(|| UnexpectedSnafu {
332        reason: format!(
333            "Expected timestamp column {ts_col_name:?} in table {table_name:?} to be timestamp, but got {ts_index:?}"
334        ),
335    })?.unit();
336
337    let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
338        ts_col_name.clone(),
339        ts_index.data_type.as_arrow_type(),
340        false,
341    )]));
342
343    let df_schema = DFSchema::from_field_specific_qualified_schema(
344        vec![Some(TableReference::bare(table_name))],
345        &arrow_schema,
346    )
347    .with_context(|_e| DatafusionSnafu {
348        context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
349    })?;
350
351    // find the time window expr which refers to the time index column
352    let mut aggr_expr = None;
353    let mut time_window_expr: Option<Expr> = None;
354
355    let find_inner_aggr_expr = |plan: &LogicalPlan| {
356        if let LogicalPlan::Aggregate(aggregate) = plan {
357            aggr_expr = Some(aggregate.clone());
358        };
359
360        Ok(TreeNodeRecursion::Continue)
361    };
362    plan.apply(find_inner_aggr_expr)
363        .with_context(|_| DatafusionSnafu {
364            context: format!("Can't find aggr expr in plan {plan:?}"),
365        })?;
366
367    if let Some(aggregate) = aggr_expr {
368        for group_expr in &aggregate.group_expr {
369            let refs = group_expr.column_refs();
370            if refs.len() != 1 {
371                continue;
372            }
373            let ref_col = refs.iter().next().unwrap();
374
375            let index = aggregate.input.schema().maybe_index_of_column(ref_col);
376            let Some(index) = index else {
377                continue;
378            };
379            let field = aggregate.input.schema().field(index);
380
381            // TODO(discord9): need to ensure the field has the meta key for the time index
382            let is_time_index =
383                field.metadata().get(TIME_INDEX_KEY).map(|s| s.as_str()) == Some("true");
384
385            if is_time_index {
386                let rewrite_column = group_expr.clone();
387                let rewritten = rewrite_column
388                    .rewrite(&mut RewriteColumn {
389                        table_name: table_name.to_string(),
390                    })
391                    .with_context(|_| DatafusionSnafu {
392                        context: format!("Rewrite expr failed, expr={:?}", group_expr),
393                    })?
394                    .data;
395                struct RewriteColumn {
396                    table_name: String,
397                }
398
399                impl TreeNodeRewriter for RewriteColumn {
400                    type Node = Expr;
401                    fn f_down(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
402                        let Expr::Column(mut column) = node else {
403                            return Ok(Transformed::no(node));
404                        };
405
406                        column.relation = Some(TableReference::bare(self.table_name.clone()));
407
408                        Ok(Transformed::yes(Expr::Column(column)))
409                    }
410                }
411
412                time_window_expr = Some(rewritten);
413                break;
414            }
415        }
416        Ok((ts_col_name, time_window_expr, expected_time_unit, df_schema))
417    } else {
418        // can't found time window expr, return None
419        Ok((ts_col_name, None, expected_time_unit, df_schema))
420    }
421}
422
423/// Find nearest lower bound for time `current` in given `plan` for the time window expr.
424/// i.e. for time window expr being `date_bin(INTERVAL '5 minutes', ts) as time_window` and `current="2021-07-01 00:01:01.000"`,
425/// return `Some("2021-07-01 00:00:00.000")`
426/// if `plan` doesn't contain a `TIME INDEX` column, return `None`
427///
428/// Time window expr is a expr that:
429/// 1. ref only to a time index column
430/// 2. is monotonic increasing
431/// 3. show up in GROUP BY clause
432///
433/// note this plan should only contain one TableScan
434#[cfg(test)]
435pub async fn find_plan_time_window_bound(
436    plan: &LogicalPlan,
437    current: Timestamp,
438    query_ctx: QueryContextRef,
439    engine: query::QueryEngineRef,
440) -> Result<(String, Option<Timestamp>, Option<Timestamp>), Error> {
441    // TODO(discord9): find the expr that do time window
442    let catalog_man = engine.engine_state().catalog_manager();
443
444    let (ts_col_name, time_window_expr, expected_time_unit, df_schema) =
445        find_time_window_expr(plan, catalog_man.clone(), query_ctx).await?;
446    // cast current to ts_index's type
447    let new_current = current
448        .convert_to(expected_time_unit)
449        .with_context(|| UnexpectedSnafu {
450            reason: format!("Failed to cast current timestamp {current:?} to {expected_time_unit}"),
451        })?;
452
453    // if no time_window_expr is found, return None
454    if let Some(time_window_expr) = time_window_expr {
455        let phy_expr = to_phy_expr(
456            &time_window_expr,
457            &df_schema,
458            &engine.engine_state().session_state(),
459        )?;
460        let lower_bound = calc_expr_time_window_lower_bound(&phy_expr, &df_schema, new_current)?;
461        let upper_bound = probe_expr_time_window_upper_bound(&phy_expr, &df_schema, new_current)?;
462        Ok((ts_col_name, lower_bound, upper_bound))
463    } else {
464        Ok((ts_col_name, None, None))
465    }
466}
467
468/// Find the lower bound of time window in given `expr` and `current` timestamp.
469///
470/// i.e. for `current="2021-07-01 00:01:01.000"` and `expr=date_bin(INTERVAL '5 minutes', ts) as time_window` and `ts_col=ts`,
471/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
472/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
473/// of current time window given the current timestamp
474///
475/// if return None, meaning this time window have no lower bound
476fn calc_expr_time_window_lower_bound(
477    phy_expr: &PhysicalExprRef,
478    df_schema: &DFSchema,
479    current: Timestamp,
480) -> Result<Option<Timestamp>, Error> {
481    let cur_time_window = eval_phy_time_window_expr(phy_expr, df_schema, current)?;
482    let input_time_unit = cur_time_window.unit();
483    Ok(cur_time_window.convert_to(input_time_unit))
484}
485
486/// Probe for the upper bound for time window expression
487fn probe_expr_time_window_upper_bound(
488    phy_expr: &PhysicalExprRef,
489    df_schema: &DFSchema,
490    current: Timestamp,
491) -> Result<Option<Timestamp>, Error> {
492    // TODO(discord9): special handling `date_bin` for faster path
493    use std::cmp::Ordering;
494
495    let cur_time_window = eval_phy_time_window_expr(phy_expr, df_schema, current)?;
496
497    // search to find the lower bound
498    let mut offset: i64 = 1;
499    let mut lower_bound = Some(current);
500    let upper_bound;
501    // first expontial probe to found a range for binary search
502    loop {
503        let Some(next_val) = current.value().checked_add(offset) else {
504            // no upper bound if overflow, which is ok
505            return Ok(None);
506        };
507
508        let next_time_probe = common_time::Timestamp::new(next_val, current.unit());
509
510        let next_time_window = eval_phy_time_window_expr(phy_expr, df_schema, next_time_probe)?;
511
512        match next_time_window.cmp(&cur_time_window) {
513            Ordering::Less => UnexpectedSnafu {
514                    reason: format!(
515                        "Unsupported time window expression, expect monotonic increasing for time window expression {phy_expr:?}"
516                    ),
517                }
518                .fail()?,
519            Ordering::Equal => {
520                lower_bound = Some(next_time_probe);
521            }
522            Ordering::Greater => {
523                upper_bound = Some(next_time_probe);
524                break
525            }
526        }
527
528        let Some(new_offset) = offset.checked_mul(2) else {
529            // no upper bound if overflow
530            return Ok(None);
531        };
532        offset = new_offset;
533    }
534
535    // binary search for the exact upper bound
536
537    binary_search_expr(
538        lower_bound,
539        upper_bound,
540        cur_time_window,
541        phy_expr,
542        df_schema,
543    )
544    .map(Some)
545}
546
547fn binary_search_expr(
548    lower_bound: Option<Timestamp>,
549    upper_bound: Option<Timestamp>,
550    cur_time_window: Timestamp,
551    phy_expr: &PhysicalExprRef,
552    df_schema: &DFSchema,
553) -> Result<Timestamp, Error> {
554    ensure!(lower_bound.map(|v|v.unit()) == upper_bound.map(|v| v.unit()), UnexpectedSnafu {
555        reason: format!(" unit mismatch for time window expression {phy_expr:?}, found {lower_bound:?} and {upper_bound:?}"),
556    });
557
558    let output_unit = upper_bound
559        .context(UnexpectedSnafu {
560            reason: "should have lower bound",
561        })?
562        .unit();
563
564    let mut low = lower_bound
565        .context(UnexpectedSnafu {
566            reason: "should have lower bound",
567        })?
568        .value();
569    let mut high = upper_bound
570        .context(UnexpectedSnafu {
571            reason: "should have upper bound",
572        })?
573        .value();
574    while low < high {
575        let mid = (low + high) / 2;
576        let mid_probe = common_time::Timestamp::new(mid, output_unit);
577        let mid_time_window = eval_phy_time_window_expr(phy_expr, df_schema, mid_probe)?;
578
579        match mid_time_window.cmp(&cur_time_window) {
580            std::cmp::Ordering::Less => UnexpectedSnafu {
581                reason: format!("Binary search failed for time window expression {phy_expr:?}"),
582            }
583            .fail()?,
584            std::cmp::Ordering::Equal => low = mid + 1,
585            std::cmp::Ordering::Greater => high = mid,
586        }
587    }
588
589    let final_upper_bound_for_time_window = common_time::Timestamp::new(high, output_unit);
590    Ok(final_upper_bound_for_time_window)
591}
592
593/// Expect the `phy` expression only have one input column with Timestamp type, and also return Timestamp type
594fn eval_phy_time_window_expr(
595    phy: &PhysicalExprRef,
596    df_schema: &DFSchema,
597    input_value: Timestamp,
598) -> Result<Timestamp, Error> {
599    let schema_ty = df_schema.field(0).data_type();
600    let schema_cdt = ConcreteDataType::from_arrow_type(schema_ty);
601    let schema_unit = if let ConcreteDataType::Timestamp(ts) = schema_cdt {
602        ts.unit()
603    } else {
604        return UnexpectedSnafu {
605            reason: format!("Expect Timestamp, found {:?}", schema_cdt),
606        }
607        .fail();
608    };
609    let input_value = input_value
610        .convert_to(schema_unit)
611        .with_context(|| UnexpectedSnafu {
612            reason: format!("Failed to convert timestamp {input_value:?} to {schema_unit}"),
613        })?;
614    let ts_vector = match schema_unit {
615        TimeUnit::Second => {
616            TimestampSecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
617        }
618        TimeUnit::Millisecond => {
619            TimestampMillisecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
620        }
621        TimeUnit::Microsecond => {
622            TimestampMicrosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
623        }
624        TimeUnit::Nanosecond => {
625            TimestampNanosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
626        }
627    };
628
629    let rb = DfRecordBatch::try_new(df_schema.inner().clone(), vec![ts_vector.clone()])
630        .with_context(|_| ArrowSnafu {
631            context: format!("Failed to create record batch from {df_schema:?} and {ts_vector:?}"),
632        })?;
633
634    let eval_res = phy.evaluate(&rb).with_context(|_| DatafusionSnafu {
635        context: format!("Failed to evaluate physical expression {phy:?} on {rb:?}"),
636    })?;
637
638    if let Some(Some(ts)) = columnar_to_ts_vector(&eval_res)?.first() {
639        Ok(*ts)
640    } else {
641        UnexpectedSnafu {
642            reason: format!(
643                "Expected timestamp in expression {phy:?} but got {:?}",
644                eval_res
645            ),
646        }
647        .fail()?
648    }
649}
650
651fn to_phy_expr(
652    expr: &Expr,
653    df_schema: &DFSchema,
654    session: &SessionState,
655) -> Result<PhysicalExprRef, Error> {
656    let phy_planner = DefaultPhysicalPlanner::default();
657
658    let phy_expr: PhysicalExprRef = phy_planner
659        .create_physical_expr(expr, df_schema, session)
660        .with_context(|_e| DatafusionSnafu {
661            context: format!(
662                "Failed to create physical expression from {expr:?} using {df_schema:?}"
663            ),
664        })?;
665    Ok(phy_expr)
666}
667
668#[cfg(test)]
669mod test {
670    use datafusion_common::tree_node::TreeNode;
671    use pretty_assertions::assert_eq;
672    use session::context::QueryContext;
673
674    use super::*;
675    use crate::batching_mode::utils::{df_plan_to_sql, sql_to_df_plan, AddFilterRewriter};
676    use crate::test_utils::create_test_query_engine;
677
678    #[tokio::test]
679    async fn test_plan_time_window_lower_bound() {
680        use datafusion_expr::{col, lit};
681        let query_engine = create_test_query_engine();
682        let ctx = QueryContext::arc();
683
684        let testcases = [
685            // same alias is not same column
686            (
687                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts GROUP BY ts;",
688                Timestamp::new(1740394109, TimeUnit::Second),
689                (
690                    "ts".to_string(),
691                    Some(Timestamp::new(1740394109000, TimeUnit::Millisecond)),
692                    Some(Timestamp::new(1740394109001, TimeUnit::Millisecond)),
693                ),
694                r#"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"#
695            ),
696            // complex time window index
697            (
698                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts GROUP BY time_window;",
699                Timestamp::new(1740394109, TimeUnit::Second),
700                (
701                    "ts".to_string(),
702                    Some(Timestamp::new(1740394080, TimeUnit::Second)),
703                    Some(Timestamp::new(1740394140, TimeUnit::Second)),
704                ),
705                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
706            ),
707            // complex time window index with where
708            (
709                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE number in (2, 3, 4) GROUP BY time_window;",
710                Timestamp::new(1740394109, TimeUnit::Second),
711                (
712                    "ts".to_string(),
713                    Some(Timestamp::new(1740394080, TimeUnit::Second)),
714                    Some(Timestamp::new(1740394140, TimeUnit::Second)),
715                ),
716                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE numbers_with_ts.number IN (2, 3, 4) AND ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
717            ),
718            // complex time window index with between and
719            (
720                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE number BETWEEN 2 AND 4 GROUP BY time_window;",
721                Timestamp::new(1740394109, TimeUnit::Second),
722                (
723                    "ts".to_string(),
724                    Some(Timestamp::new(1740394080, TimeUnit::Second)),
725                    Some(Timestamp::new(1740394140, TimeUnit::Second)),
726                ),
727                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE (numbers_with_ts.number BETWEEN 2 AND 4) AND ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
728            ),
729            // no time index
730            (
731                "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;",
732                Timestamp::new(23, TimeUnit::Millisecond),
733                ("ts".to_string(), None, None),
734                "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;"
735            ),
736            // time index
737            (
738                "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
739                Timestamp::new(23, TimeUnit::Nanosecond),
740                (
741                    "ts".to_string(),
742                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
743                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
744                ),
745                "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
746            ),
747            // on spot
748            (
749                "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
750                Timestamp::new(0, TimeUnit::Nanosecond),
751                (
752                    "ts".to_string(),
753                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
754                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
755                ),
756                "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
757            ),
758            // different time unit
759            (
760                "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
761                Timestamp::new(23_000_000, TimeUnit::Nanosecond),
762                (
763                    "ts".to_string(),
764                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
765                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
766                ),
767                "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
768            ),
769            // time index with other fields
770            (
771                "SELECT sum(number) as sum_up, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
772                Timestamp::new(23, TimeUnit::Millisecond),
773                (
774                    "ts".to_string(),
775                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
776                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
777                ),
778                "SELECT sum(numbers_with_ts.number) AS sum_up, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
779            ),
780            // time index with other pks
781            (
782                "SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number;",
783                Timestamp::new(23, TimeUnit::Millisecond),
784                (
785                    "ts".to_string(),
786                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
787                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
788                ),
789                "SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number"
790            ),
791            // subquery
792            (
793                "SELECT number, time_window FROM (SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number);",
794                Timestamp::new(23, TimeUnit::Millisecond),
795                (
796                    "ts".to_string(),
797                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
798                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
799                ),
800                "SELECT numbers_with_ts.number, time_window FROM (SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number)"
801            ),
802            // cte
803            (
804                "with cte as (select number, date_bin('5 minutes', ts) as time_window from numbers_with_ts GROUP BY time_window, number) select number, time_window from cte;",
805                Timestamp::new(23, TimeUnit::Millisecond),
806                (
807                    "ts".to_string(),
808                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
809                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
810                ),
811                "SELECT cte.number, cte.time_window FROM (SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number) AS cte"
812            ),
813            // complex subquery without alias
814            (
815                "SELECT sum(number), number, date_bin('5 minutes', ts) as time_window, bucket_name FROM (SELECT number, ts, case when number < 5 THEN 'bucket_0_5' when number >= 5 THEN 'bucket_5_inf' END as bucket_name FROM numbers_with_ts) GROUP BY number, time_window, bucket_name;",
816                Timestamp::new(23, TimeUnit::Millisecond),
817                (
818                    "ts".to_string(),
819                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
820                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
821                ),
822                "SELECT sum(numbers_with_ts.number), numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window, bucket_name FROM (SELECT numbers_with_ts.number, numbers_with_ts.ts, CASE WHEN (numbers_with_ts.number < 5) THEN 'bucket_0_5' WHEN (numbers_with_ts.number >= 5) THEN 'bucket_5_inf' END AS bucket_name FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP)))) GROUP BY numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts), bucket_name"
823            ),
824            // complex subquery alias
825            (
826                "SELECT sum(number), number, date_bin('5 minutes', ts) as time_window, bucket_name FROM (SELECT number, ts, case when number < 5 THEN 'bucket_0_5' when number >= 5 THEN 'bucket_5_inf' END as bucket_name FROM numbers_with_ts) as cte GROUP BY number, time_window, bucket_name;",
827                Timestamp::new(23, TimeUnit::Millisecond),
828                (
829                    "ts".to_string(),
830                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
831                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
832                ),
833                "SELECT sum(cte.number), cte.number, date_bin('5 minutes', cte.ts) AS time_window, cte.bucket_name FROM (SELECT numbers_with_ts.number, numbers_with_ts.ts, CASE WHEN (numbers_with_ts.number < 5) THEN 'bucket_0_5' WHEN (numbers_with_ts.number >= 5) THEN 'bucket_5_inf' END AS bucket_name FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP)))) AS cte GROUP BY cte.number, date_bin('5 minutes', cte.ts), cte.bucket_name"
834            ),
835        ];
836
837        for (sql, current, expected, expected_unparsed) in testcases {
838            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, true)
839                .await
840                .unwrap();
841
842            let real =
843                find_plan_time_window_bound(&plan, current, ctx.clone(), query_engine.clone())
844                    .await
845                    .unwrap();
846            assert_eq!(expected, real);
847
848            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false)
849                .await
850                .unwrap();
851            let (col_name, lower, upper) = real;
852            let new_sql = if lower.is_some() {
853                let to_df_literal = |value| {
854                    let value = Value::from(value);
855
856                    value.try_to_scalar_value(&value.data_type()).unwrap()
857                };
858                let lower = to_df_literal(lower.unwrap());
859                let upper = to_df_literal(upper.unwrap());
860                let expr = col(&col_name)
861                    .gt_eq(lit(lower))
862                    .and(col(&col_name).lt_eq(lit(upper)));
863                let mut add_filter = AddFilterRewriter::new(expr);
864                let plan = plan.rewrite(&mut add_filter).unwrap().data;
865                df_plan_to_sql(&plan).unwrap()
866            } else {
867                sql.to_string()
868            };
869            assert_eq!(expected_unparsed, new_sql);
870        }
871    }
872}