flow/batching_mode/
time_window.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Time window expr and helper functions
16//!
17
18use std::collections::BTreeSet;
19use std::sync::Arc;
20
21use api::helper::pb_value_to_value_ref;
22use arrow::array::{
23    TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
24    TimestampSecondArray,
25};
26use catalog::CatalogManagerRef;
27use common_error::ext::BoxedError;
28use common_recordbatch::DfRecordBatch;
29use common_telemetry::warn;
30use common_time::timestamp::TimeUnit;
31use common_time::Timestamp;
32use datafusion::error::Result as DfResult;
33use datafusion::execution::SessionState;
34use datafusion::logical_expr::Expr;
35use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
36use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
37use datafusion_common::{DFSchema, TableReference};
38use datafusion_expr::{ColumnarValue, LogicalPlan};
39use datafusion_physical_expr::PhysicalExprRef;
40use datatypes::prelude::{ConcreteDataType, DataType};
41use datatypes::schema::TIME_INDEX_KEY;
42use datatypes::value::Value;
43use datatypes::vectors::{
44    TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
45    TimestampSecondVector, Vector,
46};
47use itertools::Itertools;
48use session::context::QueryContextRef;
49use snafu::{ensure, OptionExt, ResultExt};
50
51use crate::adapter::util::from_proto_to_data_type;
52use crate::error::{
53    ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, PlanSnafu, UnexpectedSnafu,
54};
55use crate::expr::error::DataTypeSnafu;
56use crate::Error;
57
58/// Represents a test timestamp in seconds since the Unix epoch.
59const DEFAULT_TEST_TIMESTAMP: Timestamp = Timestamp::new_second(17_0000_0000);
60
61/// Time window expr like `date_bin(INTERVAL '1' MINUTE, ts)`, this type help with
62/// evaluating the expr using given timestamp
63///
64/// The time window expr must satisfies following conditions:
65/// 1. The expr must be monotonic non-decreasing
66/// 2. The expr must only have one and only one input column with timestamp type, and the output column must be timestamp type
67/// 3. The expr must be deterministic
68///
69/// An example of time window expr is `date_bin(INTERVAL '1' MINUTE, ts)`
70#[derive(Debug, Clone)]
71pub struct TimeWindowExpr {
72    phy_expr: PhysicalExprRef,
73    pub column_name: String,
74    logical_expr: Expr,
75    df_schema: DFSchema,
76    eval_time_window_size: Option<std::time::Duration>,
77}
78
79impl std::fmt::Display for TimeWindowExpr {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        f.debug_struct("TimeWindowExpr")
82            .field("phy_expr", &self.phy_expr.to_string())
83            .field("column_name", &self.column_name)
84            .field("logical_expr", &self.logical_expr.to_string())
85            .field("df_schema", &self.df_schema)
86            .finish()
87    }
88}
89
90impl TimeWindowExpr {
91    /// The time window size of the expr, get from calling `eval` with a test timestamp
92    pub fn time_window_size(&self) -> &Option<std::time::Duration> {
93        &self.eval_time_window_size
94    }
95
96    pub fn from_expr(
97        expr: &Expr,
98        column_name: &str,
99        df_schema: &DFSchema,
100        session: &SessionState,
101    ) -> Result<Self, Error> {
102        let phy_expr: PhysicalExprRef = to_phy_expr(expr, df_schema, session)?;
103        let mut zelf = Self {
104            phy_expr,
105            column_name: column_name.to_string(),
106            logical_expr: expr.clone(),
107            df_schema: df_schema.clone(),
108            eval_time_window_size: None,
109        };
110        let test_ts = DEFAULT_TEST_TIMESTAMP;
111        let (l, u) = zelf.eval(test_ts)?;
112        let time_window_size = match (l, u) {
113            (Some(l), Some(u)) => u.sub(&l).map(|r| r.to_std()).transpose().map_err(|_| {
114                UnexpectedSnafu {
115                    reason: format!(
116                        "Expect upper bound older than lower bound, found upper={u:?} and lower={l:?}"
117                    ),
118                }
119                .build()
120            })?,
121            _ => None,
122        };
123        zelf.eval_time_window_size = time_window_size;
124        Ok(zelf)
125    }
126
127    pub fn eval(
128        &self,
129        current: Timestamp,
130    ) -> Result<(Option<Timestamp>, Option<Timestamp>), Error> {
131        let lower_bound =
132            calc_expr_time_window_lower_bound(&self.phy_expr, &self.df_schema, current)?;
133        let upper_bound =
134            probe_expr_time_window_upper_bound(&self.phy_expr, &self.df_schema, current)?;
135        Ok((lower_bound, upper_bound))
136    }
137
138    /// Find timestamps from rows using time window expr
139    ///
140    /// use column of name `self.column_name` from input rows list as input to time window expr
141    pub async fn handle_rows(
142        &self,
143        rows_list: Vec<api::v1::Rows>,
144    ) -> Result<BTreeSet<Timestamp>, Error> {
145        let mut time_windows = BTreeSet::new();
146
147        for rows in rows_list {
148            // pick the time index column and use it to eval on `self.expr`
149            // TODO(discord9): handle case where time index column is not present(i.e. DEFAULT constant value)
150            let ts_col_index = rows
151                .schema
152                .iter()
153                .map(|col| col.column_name.clone())
154                .position(|name| name == self.column_name);
155            let Some(ts_col_index) = ts_col_index else {
156                warn!("can't found time index column in schema: {:?}", rows.schema);
157                continue;
158            };
159            let col_schema = &rows.schema[ts_col_index];
160            let cdt = from_proto_to_data_type(col_schema)?;
161
162            let mut vector = cdt.create_mutable_vector(rows.rows.len());
163            for row in rows.rows {
164                let value = pb_value_to_value_ref(&row.values[ts_col_index], &None);
165                vector.try_push_value_ref(value).context(DataTypeSnafu {
166                    msg: "Failed to convert rows to columns",
167                })?;
168            }
169            let vector = vector.to_vector();
170
171            let df_schema = create_df_schema_for_ts_column(&self.column_name, cdt)?;
172
173            let rb =
174                DfRecordBatch::try_new(df_schema.inner().clone(), vec![vector.to_arrow_array()])
175                    .with_context(|_e| ArrowSnafu {
176                        context: format!(
177                            "Failed to create record batch from {df_schema:?} and {vector:?}"
178                        ),
179                    })?;
180
181            let eval_res = self
182                .phy_expr
183                .evaluate(&rb)
184                .with_context(|_| DatafusionSnafu {
185                    context: format!(
186                        "Failed to evaluate physical expression {:?} on {rb:?}",
187                        self.phy_expr
188                    ),
189                })?;
190
191            let res = columnar_to_ts_vector(&eval_res)?;
192
193            for ts in res.into_iter().flatten() {
194                time_windows.insert(ts);
195            }
196        }
197
198        Ok(time_windows)
199    }
200}
201
202fn create_df_schema_for_ts_column(name: &str, cdt: ConcreteDataType) -> Result<DFSchema, Error> {
203    let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
204        name,
205        cdt.as_arrow_type(),
206        false,
207    )]));
208
209    let df_schema = DFSchema::from_field_specific_qualified_schema(
210        vec![Some(TableReference::bare("TimeIndexOnlyTable"))],
211        &arrow_schema,
212    )
213    .with_context(|_e| DatafusionSnafu {
214        context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
215    })?;
216
217    Ok(df_schema)
218}
219
220/// Convert `ColumnarValue` to `Vec<Option<Timestamp>>`
221fn columnar_to_ts_vector(columnar: &ColumnarValue) -> Result<Vec<Option<Timestamp>>, Error> {
222    let val = match columnar {
223        datafusion_expr::ColumnarValue::Array(array) => {
224            let ty = array.data_type();
225            let ty = ConcreteDataType::from_arrow_type(ty);
226            let time_unit = if let ConcreteDataType::Timestamp(ty) = ty {
227                ty.unit()
228            } else {
229                return UnexpectedSnafu {
230                    reason: format!("Non-timestamp type: {ty:?}"),
231                }
232                .fail();
233            };
234
235            match time_unit {
236                TimeUnit::Second => array
237                    .as_ref()
238                    .as_any()
239                    .downcast_ref::<TimestampSecondArray>()
240                    .with_context(|| PlanSnafu {
241                        reason: format!("Failed to create vector from arrow array {array:?}"),
242                    })?
243                    .values()
244                    .iter()
245                    .map(|d| Some(Timestamp::new(*d, time_unit)))
246                    .collect_vec(),
247                TimeUnit::Millisecond => array
248                    .as_ref()
249                    .as_any()
250                    .downcast_ref::<TimestampMillisecondArray>()
251                    .with_context(|| PlanSnafu {
252                        reason: format!("Failed to create vector from arrow array {array:?}"),
253                    })?
254                    .values()
255                    .iter()
256                    .map(|d| Some(Timestamp::new(*d, time_unit)))
257                    .collect_vec(),
258                TimeUnit::Microsecond => array
259                    .as_ref()
260                    .as_any()
261                    .downcast_ref::<TimestampMicrosecondArray>()
262                    .with_context(|| PlanSnafu {
263                        reason: format!("Failed to create vector from arrow array {array:?}"),
264                    })?
265                    .values()
266                    .iter()
267                    .map(|d| Some(Timestamp::new(*d, time_unit)))
268                    .collect_vec(),
269                TimeUnit::Nanosecond => array
270                    .as_ref()
271                    .as_any()
272                    .downcast_ref::<TimestampNanosecondArray>()
273                    .with_context(|| PlanSnafu {
274                        reason: format!("Failed to create vector from arrow array {array:?}"),
275                    })?
276                    .values()
277                    .iter()
278                    .map(|d| Some(Timestamp::new(*d, time_unit)))
279                    .collect_vec(),
280            }
281        }
282        datafusion_expr::ColumnarValue::Scalar(scalar) => {
283            let value = Value::try_from(scalar.clone()).with_context(|_| DatatypesSnafu {
284                extra: format!("Failed to convert scalar {scalar:?} to value"),
285            })?;
286            let ts = value.as_timestamp().context(UnexpectedSnafu {
287                reason: format!("Expect Timestamp, found {:?}", value),
288            })?;
289            vec![Some(ts)]
290        }
291    };
292    Ok(val)
293}
294
295/// Return (`the column name of time index column`, `the time window expr`, `the expected time unit of time index column`, `the expr's schema for evaluating the time window`)
296///
297/// The time window expr is expected to have one input column with Timestamp type, and also return Timestamp type, the time window expr is expected
298/// to be monotonic increasing and appears in the innermost GROUP BY clause
299///
300/// note this plan should only contain one TableScan
301pub async fn find_time_window_expr(
302    plan: &LogicalPlan,
303    catalog_man: CatalogManagerRef,
304    query_ctx: QueryContextRef,
305) -> Result<(String, Option<datafusion_expr::Expr>, TimeUnit, DFSchema), Error> {
306    // TODO(discord9): find the expr that do time window
307
308    let mut table_name = None;
309
310    // first find the table source in the logical plan
311    plan.apply(|plan| {
312        let LogicalPlan::TableScan(table_scan) = plan else {
313            return Ok(TreeNodeRecursion::Continue);
314        };
315        table_name = Some(table_scan.table_name.clone());
316        Ok(TreeNodeRecursion::Stop)
317    })
318    .with_context(|_| DatafusionSnafu {
319        context: format!("Can't find table source in plan {plan:?}"),
320    })?;
321    let Some(table_name) = table_name else {
322        UnexpectedSnafu {
323            reason: format!("Can't find table source in plan {plan:?}"),
324        }
325        .fail()?
326    };
327
328    let current_schema = query_ctx.current_schema();
329
330    let catalog_name = table_name.catalog().unwrap_or(query_ctx.current_catalog());
331    let schema_name = table_name.schema().unwrap_or(&current_schema);
332    let table_name = table_name.table();
333
334    let Some(table_ref) = catalog_man
335        .table(catalog_name, schema_name, table_name, Some(&query_ctx))
336        .await
337        .map_err(BoxedError::new)
338        .context(ExternalSnafu)?
339    else {
340        UnexpectedSnafu {
341            reason: format!(
342                "Can't find table {table_name:?} in catalog {catalog_name:?}/{schema_name:?}"
343            ),
344        }
345        .fail()?
346    };
347
348    let schema = &table_ref.table_info().meta.schema;
349
350    let ts_index = schema.timestamp_column().with_context(|| UnexpectedSnafu {
351        reason: format!("Can't find timestamp column in table {table_name:?}"),
352    })?;
353
354    let ts_col_name = ts_index.name.clone();
355
356    let expected_time_unit = ts_index.data_type.as_timestamp().with_context(|| UnexpectedSnafu {
357        reason: format!(
358            "Expected timestamp column {ts_col_name:?} in table {table_name:?} to be timestamp, but got {ts_index:?}"
359        ),
360    })?.unit();
361
362    let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
363        ts_col_name.clone(),
364        ts_index.data_type.as_arrow_type(),
365        false,
366    )]));
367
368    let df_schema = DFSchema::from_field_specific_qualified_schema(
369        vec![Some(TableReference::bare(table_name))],
370        &arrow_schema,
371    )
372    .with_context(|_e| DatafusionSnafu {
373        context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
374    })?;
375
376    // find the time window expr which refers to the time index column
377    let mut aggr_expr = None;
378    let mut time_window_expr: Option<Expr> = None;
379
380    let find_inner_aggr_expr = |plan: &LogicalPlan| {
381        if let LogicalPlan::Aggregate(aggregate) = plan {
382            aggr_expr = Some(aggregate.clone());
383        };
384
385        Ok(TreeNodeRecursion::Continue)
386    };
387    plan.apply(find_inner_aggr_expr)
388        .with_context(|_| DatafusionSnafu {
389            context: format!("Can't find aggr expr in plan {plan:?}"),
390        })?;
391
392    if let Some(aggregate) = aggr_expr {
393        for group_expr in &aggregate.group_expr {
394            let refs = group_expr.column_refs();
395            if refs.len() != 1 {
396                continue;
397            }
398            let ref_col = refs.iter().next().unwrap();
399
400            let index = aggregate.input.schema().maybe_index_of_column(ref_col);
401            let Some(index) = index else {
402                continue;
403            };
404            let field = aggregate.input.schema().field(index);
405
406            // TODO(discord9): need to ensure the field has the meta key for the time index
407            let is_time_index =
408                field.metadata().get(TIME_INDEX_KEY).map(|s| s.as_str()) == Some("true");
409
410            if is_time_index {
411                let rewrite_column = group_expr.clone();
412                let rewritten = rewrite_column
413                    .rewrite(&mut RewriteColumn {
414                        table_name: table_name.to_string(),
415                    })
416                    .with_context(|_| DatafusionSnafu {
417                        context: format!("Rewrite expr failed, expr={:?}", group_expr),
418                    })?
419                    .data;
420                struct RewriteColumn {
421                    table_name: String,
422                }
423
424                impl TreeNodeRewriter for RewriteColumn {
425                    type Node = Expr;
426                    fn f_down(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
427                        let Expr::Column(mut column) = node else {
428                            return Ok(Transformed::no(node));
429                        };
430
431                        column.relation = Some(TableReference::bare(self.table_name.clone()));
432
433                        Ok(Transformed::yes(Expr::Column(column)))
434                    }
435                }
436
437                time_window_expr = Some(rewritten);
438                break;
439            }
440        }
441        Ok((ts_col_name, time_window_expr, expected_time_unit, df_schema))
442    } else {
443        // can't found time window expr, return None
444        Ok((ts_col_name, None, expected_time_unit, df_schema))
445    }
446}
447
448/// Find nearest lower bound for time `current` in given `plan` for the time window expr.
449/// i.e. for time window expr being `date_bin(INTERVAL '5 minutes', ts) as time_window` and `current="2021-07-01 00:01:01.000"`,
450/// return `Some("2021-07-01 00:00:00.000")`
451/// if `plan` doesn't contain a `TIME INDEX` column, return `None`
452///
453/// Time window expr is a expr that:
454/// 1. ref only to a time index column
455/// 2. is monotonic increasing
456/// 3. show up in GROUP BY clause
457///
458/// note this plan should only contain one TableScan
459#[cfg(test)]
460pub async fn find_plan_time_window_bound(
461    plan: &LogicalPlan,
462    current: Timestamp,
463    query_ctx: QueryContextRef,
464    engine: query::QueryEngineRef,
465) -> Result<(String, Option<Timestamp>, Option<Timestamp>), Error> {
466    // TODO(discord9): find the expr that do time window
467    let catalog_man = engine.engine_state().catalog_manager();
468
469    let (ts_col_name, time_window_expr, expected_time_unit, df_schema) =
470        find_time_window_expr(plan, catalog_man.clone(), query_ctx).await?;
471    // cast current to ts_index's type
472    let new_current = current
473        .convert_to(expected_time_unit)
474        .with_context(|| UnexpectedSnafu {
475            reason: format!("Failed to cast current timestamp {current:?} to {expected_time_unit}"),
476        })?;
477
478    // if no time_window_expr is found, return None
479    if let Some(time_window_expr) = time_window_expr {
480        let phy_expr = to_phy_expr(
481            &time_window_expr,
482            &df_schema,
483            &engine.engine_state().session_state(),
484        )?;
485        let lower_bound = calc_expr_time_window_lower_bound(&phy_expr, &df_schema, new_current)?;
486        let upper_bound = probe_expr_time_window_upper_bound(&phy_expr, &df_schema, new_current)?;
487        Ok((ts_col_name, lower_bound, upper_bound))
488    } else {
489        Ok((ts_col_name, None, None))
490    }
491}
492
493/// Find the lower bound of time window in given `expr` and `current` timestamp.
494///
495/// i.e. for `current="2021-07-01 00:01:01.000"` and `expr=date_bin(INTERVAL '5 minutes', ts) as time_window` and `ts_col=ts`,
496/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
497/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
498/// of current time window given the current timestamp
499///
500/// if return None, meaning this time window have no lower bound
501fn calc_expr_time_window_lower_bound(
502    phy_expr: &PhysicalExprRef,
503    df_schema: &DFSchema,
504    current: Timestamp,
505) -> Result<Option<Timestamp>, Error> {
506    let cur_time_window = eval_phy_time_window_expr(phy_expr, df_schema, current)?;
507    let input_time_unit = cur_time_window.unit();
508    Ok(cur_time_window.convert_to(input_time_unit))
509}
510
511/// Probe for the upper bound for time window expression
512fn probe_expr_time_window_upper_bound(
513    phy_expr: &PhysicalExprRef,
514    df_schema: &DFSchema,
515    current: Timestamp,
516) -> Result<Option<Timestamp>, Error> {
517    // TODO(discord9): special handling `date_bin` for faster path
518    use std::cmp::Ordering;
519
520    let cur_time_window = eval_phy_time_window_expr(phy_expr, df_schema, current)?;
521
522    // search to find the lower bound
523    let mut offset: i64 = 1;
524    let mut lower_bound = Some(current);
525    let upper_bound;
526    // first expontial probe to found a range for binary search
527    loop {
528        let Some(next_val) = current.value().checked_add(offset) else {
529            // no upper bound if overflow, which is ok
530            return Ok(None);
531        };
532
533        let next_time_probe = common_time::Timestamp::new(next_val, current.unit());
534
535        let next_time_window = eval_phy_time_window_expr(phy_expr, df_schema, next_time_probe)?;
536
537        match next_time_window.cmp(&cur_time_window) {
538            Ordering::Less => UnexpectedSnafu {
539                    reason: format!(
540                        "Unsupported time window expression, expect monotonic increasing for time window expression {phy_expr:?}"
541                    ),
542                }
543                .fail()?,
544            Ordering::Equal => {
545                lower_bound = Some(next_time_probe);
546            }
547            Ordering::Greater => {
548                upper_bound = Some(next_time_probe);
549                break
550            }
551        }
552
553        let Some(new_offset) = offset.checked_mul(2) else {
554            // no upper bound if overflow
555            return Ok(None);
556        };
557        offset = new_offset;
558    }
559
560    // binary search for the exact upper bound
561
562    binary_search_expr(
563        lower_bound,
564        upper_bound,
565        cur_time_window,
566        phy_expr,
567        df_schema,
568    )
569    .map(Some)
570}
571
572fn binary_search_expr(
573    lower_bound: Option<Timestamp>,
574    upper_bound: Option<Timestamp>,
575    cur_time_window: Timestamp,
576    phy_expr: &PhysicalExprRef,
577    df_schema: &DFSchema,
578) -> Result<Timestamp, Error> {
579    ensure!(lower_bound.map(|v|v.unit()) == upper_bound.map(|v| v.unit()), UnexpectedSnafu {
580        reason: format!(" unit mismatch for time window expression {phy_expr:?}, found {lower_bound:?} and {upper_bound:?}"),
581    });
582
583    let output_unit = upper_bound
584        .context(UnexpectedSnafu {
585            reason: "should have lower bound",
586        })?
587        .unit();
588
589    let mut low = lower_bound
590        .context(UnexpectedSnafu {
591            reason: "should have lower bound",
592        })?
593        .value();
594    let mut high = upper_bound
595        .context(UnexpectedSnafu {
596            reason: "should have upper bound",
597        })?
598        .value();
599    while low < high {
600        let mid = (low + high) / 2;
601        let mid_probe = common_time::Timestamp::new(mid, output_unit);
602        let mid_time_window = eval_phy_time_window_expr(phy_expr, df_schema, mid_probe)?;
603
604        match mid_time_window.cmp(&cur_time_window) {
605            std::cmp::Ordering::Less => UnexpectedSnafu {
606                reason: format!("Binary search failed for time window expression {phy_expr:?}"),
607            }
608            .fail()?,
609            std::cmp::Ordering::Equal => low = mid + 1,
610            std::cmp::Ordering::Greater => high = mid,
611        }
612    }
613
614    let final_upper_bound_for_time_window = common_time::Timestamp::new(high, output_unit);
615    Ok(final_upper_bound_for_time_window)
616}
617
618/// Expect the `phy` expression only have one input column with Timestamp type, and also return Timestamp type
619fn eval_phy_time_window_expr(
620    phy: &PhysicalExprRef,
621    df_schema: &DFSchema,
622    input_value: Timestamp,
623) -> Result<Timestamp, Error> {
624    let schema_ty = df_schema.field(0).data_type();
625    let schema_cdt = ConcreteDataType::from_arrow_type(schema_ty);
626    let schema_unit = if let ConcreteDataType::Timestamp(ts) = schema_cdt {
627        ts.unit()
628    } else {
629        return UnexpectedSnafu {
630            reason: format!("Expect Timestamp, found {:?}", schema_cdt),
631        }
632        .fail();
633    };
634    let input_value = input_value
635        .convert_to(schema_unit)
636        .with_context(|| UnexpectedSnafu {
637            reason: format!("Failed to convert timestamp {input_value:?} to {schema_unit}"),
638        })?;
639    let ts_vector = match schema_unit {
640        TimeUnit::Second => {
641            TimestampSecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
642        }
643        TimeUnit::Millisecond => {
644            TimestampMillisecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
645        }
646        TimeUnit::Microsecond => {
647            TimestampMicrosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
648        }
649        TimeUnit::Nanosecond => {
650            TimestampNanosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
651        }
652    };
653
654    let rb = DfRecordBatch::try_new(df_schema.inner().clone(), vec![ts_vector.clone()])
655        .with_context(|_| ArrowSnafu {
656            context: format!("Failed to create record batch from {df_schema:?} and {ts_vector:?}"),
657        })?;
658
659    let eval_res = phy.evaluate(&rb).with_context(|_| DatafusionSnafu {
660        context: format!("Failed to evaluate physical expression {phy:?} on {rb:?}"),
661    })?;
662
663    if let Some(Some(ts)) = columnar_to_ts_vector(&eval_res)?.first() {
664        Ok(*ts)
665    } else {
666        UnexpectedSnafu {
667            reason: format!(
668                "Expected timestamp in expression {phy:?} but got {:?}",
669                eval_res
670            ),
671        }
672        .fail()?
673    }
674}
675
676fn to_phy_expr(
677    expr: &Expr,
678    df_schema: &DFSchema,
679    session: &SessionState,
680) -> Result<PhysicalExprRef, Error> {
681    let phy_planner = DefaultPhysicalPlanner::default();
682
683    let phy_expr: PhysicalExprRef = phy_planner
684        .create_physical_expr(expr, df_schema, session)
685        .with_context(|_e| DatafusionSnafu {
686            context: format!(
687                "Failed to create physical expression from {expr:?} using {df_schema:?}"
688            ),
689        })?;
690    Ok(phy_expr)
691}
692
693#[cfg(test)]
694mod test {
695    use datafusion_common::tree_node::TreeNode;
696    use pretty_assertions::assert_eq;
697    use session::context::QueryContext;
698
699    use super::*;
700    use crate::batching_mode::utils::{df_plan_to_sql, sql_to_df_plan, AddFilterRewriter};
701    use crate::test_utils::create_test_query_engine;
702
703    #[tokio::test]
704    async fn test_plan_time_window_lower_bound() {
705        use datafusion_expr::{col, lit};
706        let query_engine = create_test_query_engine();
707        let ctx = QueryContext::arc();
708
709        let testcases = [
710            // same alias is not same column
711            (
712                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts GROUP BY ts;",
713                Timestamp::new(1740394109, TimeUnit::Second),
714                (
715                    "ts".to_string(),
716                    Some(Timestamp::new(1740394109000, TimeUnit::Millisecond)),
717                    Some(Timestamp::new(1740394109001, TimeUnit::Millisecond)),
718                ),
719                r#"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"#
720            ),
721            // complex time window index
722            (
723                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts GROUP BY time_window;",
724                Timestamp::new(1740394109, TimeUnit::Second),
725                (
726                    "ts".to_string(),
727                    Some(Timestamp::new(1740394080, TimeUnit::Second)),
728                    Some(Timestamp::new(1740394140, TimeUnit::Second)),
729                ),
730                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
731            ),
732            // complex time window index with where
733            (
734                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE number in (2, 3, 4) GROUP BY time_window;",
735                Timestamp::new(1740394109, TimeUnit::Second),
736                (
737                    "ts".to_string(),
738                    Some(Timestamp::new(1740394080, TimeUnit::Second)),
739                    Some(Timestamp::new(1740394140, TimeUnit::Second)),
740                ),
741                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE numbers_with_ts.number IN (2, 3, 4) AND ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
742            ),
743            // complex time window index with between and
744            (
745                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE number BETWEEN 2 AND 4 GROUP BY time_window;",
746                Timestamp::new(1740394109, TimeUnit::Second),
747                (
748                    "ts".to_string(),
749                    Some(Timestamp::new(1740394080, TimeUnit::Second)),
750                    Some(Timestamp::new(1740394140, TimeUnit::Second)),
751                ),
752                "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE (numbers_with_ts.number BETWEEN 2 AND 4) AND ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
753            ),
754            // no time index
755            (
756                "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;",
757                Timestamp::new(23, TimeUnit::Millisecond),
758                ("ts".to_string(), None, None),
759                "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;"
760            ),
761            // time index
762            (
763                "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
764                Timestamp::new(23, TimeUnit::Nanosecond),
765                (
766                    "ts".to_string(),
767                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
768                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
769                ),
770                "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
771            ),
772            // on spot
773            (
774                "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
775                Timestamp::new(0, TimeUnit::Nanosecond),
776                (
777                    "ts".to_string(),
778                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
779                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
780                ),
781                "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
782            ),
783            // different time unit
784            (
785                "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
786                Timestamp::new(23_000_000, TimeUnit::Nanosecond),
787                (
788                    "ts".to_string(),
789                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
790                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
791                ),
792                "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
793            ),
794            // time index with other fields
795            (
796                "SELECT sum(number) as sum_up, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
797                Timestamp::new(23, TimeUnit::Millisecond),
798                (
799                    "ts".to_string(),
800                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
801                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
802                ),
803                "SELECT sum(numbers_with_ts.number) AS sum_up, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
804            ),
805            // time index with other pks
806            (
807                "SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number;",
808                Timestamp::new(23, TimeUnit::Millisecond),
809                (
810                    "ts".to_string(),
811                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
812                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
813                ),
814                "SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number"
815            ),
816            // subquery
817            (
818                "SELECT number, time_window FROM (SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number);",
819                Timestamp::new(23, TimeUnit::Millisecond),
820                (
821                    "ts".to_string(),
822                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
823                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
824                ),
825                "SELECT numbers_with_ts.number, time_window FROM (SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number)"
826            ),
827            // cte
828            (
829                "with cte as (select number, date_bin('5 minutes', ts) as time_window from numbers_with_ts GROUP BY time_window, number) select number, time_window from cte;",
830                Timestamp::new(23, TimeUnit::Millisecond),
831                (
832                    "ts".to_string(),
833                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
834                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
835                ),
836                "SELECT cte.number, cte.time_window FROM (SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number) AS cte"
837            ),
838            // complex subquery without alias
839            (
840                "SELECT sum(number), number, date_bin('5 minutes', ts) as time_window, bucket_name FROM (SELECT number, ts, case when number < 5 THEN 'bucket_0_5' when number >= 5 THEN 'bucket_5_inf' END as bucket_name FROM numbers_with_ts) GROUP BY number, time_window, bucket_name;",
841                Timestamp::new(23, TimeUnit::Millisecond),
842                (
843                    "ts".to_string(),
844                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
845                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
846                ),
847                "SELECT sum(numbers_with_ts.number), numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window, bucket_name FROM (SELECT numbers_with_ts.number, numbers_with_ts.ts, CASE WHEN (numbers_with_ts.number < 5) THEN 'bucket_0_5' WHEN (numbers_with_ts.number >= 5) THEN 'bucket_5_inf' END AS bucket_name FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP)))) GROUP BY numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts), bucket_name"
848            ),
849            // complex subquery alias
850            (
851                "SELECT sum(number), number, date_bin('5 minutes', ts) as time_window, bucket_name FROM (SELECT number, ts, case when number < 5 THEN 'bucket_0_5' when number >= 5 THEN 'bucket_5_inf' END as bucket_name FROM numbers_with_ts) as cte GROUP BY number, time_window, bucket_name;",
852                Timestamp::new(23, TimeUnit::Millisecond),
853                (
854                    "ts".to_string(),
855                    Some(Timestamp::new(0, TimeUnit::Millisecond)),
856                    Some(Timestamp::new(300000, TimeUnit::Millisecond)),
857                ),
858                "SELECT sum(cte.number), cte.number, date_bin('5 minutes', cte.ts) AS time_window, cte.bucket_name FROM (SELECT numbers_with_ts.number, numbers_with_ts.ts, CASE WHEN (numbers_with_ts.number < 5) THEN 'bucket_0_5' WHEN (numbers_with_ts.number >= 5) THEN 'bucket_5_inf' END AS bucket_name FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP)))) AS cte GROUP BY cte.number, date_bin('5 minutes', cte.ts), cte.bucket_name"
859            ),
860        ];
861
862        for (sql, current, expected, expected_unparsed) in testcases {
863            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, true)
864                .await
865                .unwrap();
866
867            let real =
868                find_plan_time_window_bound(&plan, current, ctx.clone(), query_engine.clone())
869                    .await
870                    .unwrap();
871            assert_eq!(expected, real);
872
873            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false)
874                .await
875                .unwrap();
876            let (col_name, lower, upper) = real;
877            let new_sql = if lower.is_some() {
878                let to_df_literal = |value| {
879                    let value = Value::from(value);
880
881                    value.try_to_scalar_value(&value.data_type()).unwrap()
882                };
883                let lower = to_df_literal(lower.unwrap());
884                let upper = to_df_literal(upper.unwrap());
885                let expr = col(&col_name)
886                    .gt_eq(lit(lower))
887                    .and(col(&col_name).lt_eq(lit(upper)));
888                let mut add_filter = AddFilterRewriter::new(expr);
889                let plan = plan.rewrite(&mut add_filter).unwrap().data;
890                df_plan_to_sql(&plan).unwrap()
891            } else {
892                sql.to_string()
893            };
894            assert_eq!(expected_unparsed, new_sql);
895        }
896    }
897}