1use std::collections::BTreeSet;
19use std::sync::Arc;
20
21use api::helper::pb_value_to_value_ref;
22use arrow::array::{
23 TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
24 TimestampSecondArray,
25};
26use catalog::CatalogManagerRef;
27use common_error::ext::BoxedError;
28use common_recordbatch::DfRecordBatch;
29use common_telemetry::warn;
30use common_time::timestamp::TimeUnit;
31use common_time::Timestamp;
32use datafusion::error::Result as DfResult;
33use datafusion::execution::SessionState;
34use datafusion::logical_expr::Expr;
35use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
36use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
37use datafusion_common::{DFSchema, TableReference};
38use datafusion_expr::{ColumnarValue, LogicalPlan};
39use datafusion_physical_expr::PhysicalExprRef;
40use datatypes::prelude::{ConcreteDataType, DataType};
41use datatypes::schema::TIME_INDEX_KEY;
42use datatypes::value::Value;
43use datatypes::vectors::{
44 TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
45 TimestampSecondVector, Vector,
46};
47use itertools::Itertools;
48use session::context::QueryContextRef;
49use snafu::{ensure, OptionExt, ResultExt};
50
51use crate::adapter::util::from_proto_to_data_type;
52use crate::error::{
53 ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, PlanSnafu, UnexpectedSnafu,
54};
55use crate::expr::error::DataTypeSnafu;
56use crate::Error;
57
58#[derive(Debug, Clone)]
68pub struct TimeWindowExpr {
69 phy_expr: PhysicalExprRef,
70 pub column_name: String,
71 logical_expr: Expr,
72 df_schema: DFSchema,
73}
74
75impl std::fmt::Display for TimeWindowExpr {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("TimeWindowExpr")
78 .field("phy_expr", &self.phy_expr.to_string())
79 .field("column_name", &self.column_name)
80 .field("logical_expr", &self.logical_expr.to_string())
81 .field("df_schema", &self.df_schema)
82 .finish()
83 }
84}
85
86impl TimeWindowExpr {
87 pub fn from_expr(
88 expr: &Expr,
89 column_name: &str,
90 df_schema: &DFSchema,
91 session: &SessionState,
92 ) -> Result<Self, Error> {
93 let phy_expr: PhysicalExprRef = to_phy_expr(expr, df_schema, session)?;
94 Ok(Self {
95 phy_expr,
96 column_name: column_name.to_string(),
97 logical_expr: expr.clone(),
98 df_schema: df_schema.clone(),
99 })
100 }
101
102 pub fn eval(
103 &self,
104 current: Timestamp,
105 ) -> Result<(Option<Timestamp>, Option<Timestamp>), Error> {
106 let lower_bound =
107 calc_expr_time_window_lower_bound(&self.phy_expr, &self.df_schema, current)?;
108 let upper_bound =
109 probe_expr_time_window_upper_bound(&self.phy_expr, &self.df_schema, current)?;
110 Ok((lower_bound, upper_bound))
111 }
112
113 pub async fn handle_rows(
117 &self,
118 rows_list: Vec<api::v1::Rows>,
119 ) -> Result<BTreeSet<Timestamp>, Error> {
120 let mut time_windows = BTreeSet::new();
121
122 for rows in rows_list {
123 let ts_col_index = rows
126 .schema
127 .iter()
128 .map(|col| col.column_name.clone())
129 .position(|name| name == self.column_name);
130 let Some(ts_col_index) = ts_col_index else {
131 warn!("can't found time index column in schema: {:?}", rows.schema);
132 continue;
133 };
134 let col_schema = &rows.schema[ts_col_index];
135 let cdt = from_proto_to_data_type(col_schema)?;
136
137 let mut vector = cdt.create_mutable_vector(rows.rows.len());
138 for row in rows.rows {
139 let value = pb_value_to_value_ref(&row.values[ts_col_index], &None);
140 vector.try_push_value_ref(value).context(DataTypeSnafu {
141 msg: "Failed to convert rows to columns",
142 })?;
143 }
144 let vector = vector.to_vector();
145
146 let df_schema = create_df_schema_for_ts_column(&self.column_name, cdt)?;
147
148 let rb =
149 DfRecordBatch::try_new(df_schema.inner().clone(), vec![vector.to_arrow_array()])
150 .with_context(|_e| ArrowSnafu {
151 context: format!(
152 "Failed to create record batch from {df_schema:?} and {vector:?}"
153 ),
154 })?;
155
156 let eval_res = self
157 .phy_expr
158 .evaluate(&rb)
159 .with_context(|_| DatafusionSnafu {
160 context: format!(
161 "Failed to evaluate physical expression {:?} on {rb:?}",
162 self.phy_expr
163 ),
164 })?;
165
166 let res = columnar_to_ts_vector(&eval_res)?;
167
168 for ts in res.into_iter().flatten() {
169 time_windows.insert(ts);
170 }
171 }
172
173 Ok(time_windows)
174 }
175}
176
177fn create_df_schema_for_ts_column(name: &str, cdt: ConcreteDataType) -> Result<DFSchema, Error> {
178 let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
179 name,
180 cdt.as_arrow_type(),
181 false,
182 )]));
183
184 let df_schema = DFSchema::from_field_specific_qualified_schema(
185 vec![Some(TableReference::bare("TimeIndexOnlyTable"))],
186 &arrow_schema,
187 )
188 .with_context(|_e| DatafusionSnafu {
189 context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
190 })?;
191
192 Ok(df_schema)
193}
194
195fn columnar_to_ts_vector(columnar: &ColumnarValue) -> Result<Vec<Option<Timestamp>>, Error> {
197 let val = match columnar {
198 datafusion_expr::ColumnarValue::Array(array) => {
199 let ty = array.data_type();
200 let ty = ConcreteDataType::from_arrow_type(ty);
201 let time_unit = if let ConcreteDataType::Timestamp(ty) = ty {
202 ty.unit()
203 } else {
204 return UnexpectedSnafu {
205 reason: format!("Non-timestamp type: {ty:?}"),
206 }
207 .fail();
208 };
209
210 match time_unit {
211 TimeUnit::Second => array
212 .as_ref()
213 .as_any()
214 .downcast_ref::<TimestampSecondArray>()
215 .with_context(|| PlanSnafu {
216 reason: format!("Failed to create vector from arrow array {array:?}"),
217 })?
218 .values()
219 .iter()
220 .map(|d| Some(Timestamp::new(*d, time_unit)))
221 .collect_vec(),
222 TimeUnit::Millisecond => array
223 .as_ref()
224 .as_any()
225 .downcast_ref::<TimestampMillisecondArray>()
226 .with_context(|| PlanSnafu {
227 reason: format!("Failed to create vector from arrow array {array:?}"),
228 })?
229 .values()
230 .iter()
231 .map(|d| Some(Timestamp::new(*d, time_unit)))
232 .collect_vec(),
233 TimeUnit::Microsecond => array
234 .as_ref()
235 .as_any()
236 .downcast_ref::<TimestampMicrosecondArray>()
237 .with_context(|| PlanSnafu {
238 reason: format!("Failed to create vector from arrow array {array:?}"),
239 })?
240 .values()
241 .iter()
242 .map(|d| Some(Timestamp::new(*d, time_unit)))
243 .collect_vec(),
244 TimeUnit::Nanosecond => array
245 .as_ref()
246 .as_any()
247 .downcast_ref::<TimestampNanosecondArray>()
248 .with_context(|| PlanSnafu {
249 reason: format!("Failed to create vector from arrow array {array:?}"),
250 })?
251 .values()
252 .iter()
253 .map(|d| Some(Timestamp::new(*d, time_unit)))
254 .collect_vec(),
255 }
256 }
257 datafusion_expr::ColumnarValue::Scalar(scalar) => {
258 let value = Value::try_from(scalar.clone()).with_context(|_| DatatypesSnafu {
259 extra: format!("Failed to convert scalar {scalar:?} to value"),
260 })?;
261 let ts = value.as_timestamp().context(UnexpectedSnafu {
262 reason: format!("Expect Timestamp, found {:?}", value),
263 })?;
264 vec![Some(ts)]
265 }
266 };
267 Ok(val)
268}
269
270pub async fn find_time_window_expr(
277 plan: &LogicalPlan,
278 catalog_man: CatalogManagerRef,
279 query_ctx: QueryContextRef,
280) -> Result<(String, Option<datafusion_expr::Expr>, TimeUnit, DFSchema), Error> {
281 let mut table_name = None;
284
285 plan.apply(|plan| {
287 let LogicalPlan::TableScan(table_scan) = plan else {
288 return Ok(TreeNodeRecursion::Continue);
289 };
290 table_name = Some(table_scan.table_name.clone());
291 Ok(TreeNodeRecursion::Stop)
292 })
293 .with_context(|_| DatafusionSnafu {
294 context: format!("Can't find table source in plan {plan:?}"),
295 })?;
296 let Some(table_name) = table_name else {
297 UnexpectedSnafu {
298 reason: format!("Can't find table source in plan {plan:?}"),
299 }
300 .fail()?
301 };
302
303 let current_schema = query_ctx.current_schema();
304
305 let catalog_name = table_name.catalog().unwrap_or(query_ctx.current_catalog());
306 let schema_name = table_name.schema().unwrap_or(¤t_schema);
307 let table_name = table_name.table();
308
309 let Some(table_ref) = catalog_man
310 .table(catalog_name, schema_name, table_name, Some(&query_ctx))
311 .await
312 .map_err(BoxedError::new)
313 .context(ExternalSnafu)?
314 else {
315 UnexpectedSnafu {
316 reason: format!(
317 "Can't find table {table_name:?} in catalog {catalog_name:?}/{schema_name:?}"
318 ),
319 }
320 .fail()?
321 };
322
323 let schema = &table_ref.table_info().meta.schema;
324
325 let ts_index = schema.timestamp_column().with_context(|| UnexpectedSnafu {
326 reason: format!("Can't find timestamp column in table {table_name:?}"),
327 })?;
328
329 let ts_col_name = ts_index.name.clone();
330
331 let expected_time_unit = ts_index.data_type.as_timestamp().with_context(|| UnexpectedSnafu {
332 reason: format!(
333 "Expected timestamp column {ts_col_name:?} in table {table_name:?} to be timestamp, but got {ts_index:?}"
334 ),
335 })?.unit();
336
337 let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
338 ts_col_name.clone(),
339 ts_index.data_type.as_arrow_type(),
340 false,
341 )]));
342
343 let df_schema = DFSchema::from_field_specific_qualified_schema(
344 vec![Some(TableReference::bare(table_name))],
345 &arrow_schema,
346 )
347 .with_context(|_e| DatafusionSnafu {
348 context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
349 })?;
350
351 let mut aggr_expr = None;
353 let mut time_window_expr: Option<Expr> = None;
354
355 let find_inner_aggr_expr = |plan: &LogicalPlan| {
356 if let LogicalPlan::Aggregate(aggregate) = plan {
357 aggr_expr = Some(aggregate.clone());
358 };
359
360 Ok(TreeNodeRecursion::Continue)
361 };
362 plan.apply(find_inner_aggr_expr)
363 .with_context(|_| DatafusionSnafu {
364 context: format!("Can't find aggr expr in plan {plan:?}"),
365 })?;
366
367 if let Some(aggregate) = aggr_expr {
368 for group_expr in &aggregate.group_expr {
369 let refs = group_expr.column_refs();
370 if refs.len() != 1 {
371 continue;
372 }
373 let ref_col = refs.iter().next().unwrap();
374
375 let index = aggregate.input.schema().maybe_index_of_column(ref_col);
376 let Some(index) = index else {
377 continue;
378 };
379 let field = aggregate.input.schema().field(index);
380
381 let is_time_index =
383 field.metadata().get(TIME_INDEX_KEY).map(|s| s.as_str()) == Some("true");
384
385 if is_time_index {
386 let rewrite_column = group_expr.clone();
387 let rewritten = rewrite_column
388 .rewrite(&mut RewriteColumn {
389 table_name: table_name.to_string(),
390 })
391 .with_context(|_| DatafusionSnafu {
392 context: format!("Rewrite expr failed, expr={:?}", group_expr),
393 })?
394 .data;
395 struct RewriteColumn {
396 table_name: String,
397 }
398
399 impl TreeNodeRewriter for RewriteColumn {
400 type Node = Expr;
401 fn f_down(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
402 let Expr::Column(mut column) = node else {
403 return Ok(Transformed::no(node));
404 };
405
406 column.relation = Some(TableReference::bare(self.table_name.clone()));
407
408 Ok(Transformed::yes(Expr::Column(column)))
409 }
410 }
411
412 time_window_expr = Some(rewritten);
413 break;
414 }
415 }
416 Ok((ts_col_name, time_window_expr, expected_time_unit, df_schema))
417 } else {
418 Ok((ts_col_name, None, expected_time_unit, df_schema))
420 }
421}
422
423#[cfg(test)]
435pub async fn find_plan_time_window_bound(
436 plan: &LogicalPlan,
437 current: Timestamp,
438 query_ctx: QueryContextRef,
439 engine: query::QueryEngineRef,
440) -> Result<(String, Option<Timestamp>, Option<Timestamp>), Error> {
441 let catalog_man = engine.engine_state().catalog_manager();
443
444 let (ts_col_name, time_window_expr, expected_time_unit, df_schema) =
445 find_time_window_expr(plan, catalog_man.clone(), query_ctx).await?;
446 let new_current = current
448 .convert_to(expected_time_unit)
449 .with_context(|| UnexpectedSnafu {
450 reason: format!("Failed to cast current timestamp {current:?} to {expected_time_unit}"),
451 })?;
452
453 if let Some(time_window_expr) = time_window_expr {
455 let phy_expr = to_phy_expr(
456 &time_window_expr,
457 &df_schema,
458 &engine.engine_state().session_state(),
459 )?;
460 let lower_bound = calc_expr_time_window_lower_bound(&phy_expr, &df_schema, new_current)?;
461 let upper_bound = probe_expr_time_window_upper_bound(&phy_expr, &df_schema, new_current)?;
462 Ok((ts_col_name, lower_bound, upper_bound))
463 } else {
464 Ok((ts_col_name, None, None))
465 }
466}
467
468fn calc_expr_time_window_lower_bound(
477 phy_expr: &PhysicalExprRef,
478 df_schema: &DFSchema,
479 current: Timestamp,
480) -> Result<Option<Timestamp>, Error> {
481 let cur_time_window = eval_phy_time_window_expr(phy_expr, df_schema, current)?;
482 let input_time_unit = cur_time_window.unit();
483 Ok(cur_time_window.convert_to(input_time_unit))
484}
485
486fn probe_expr_time_window_upper_bound(
488 phy_expr: &PhysicalExprRef,
489 df_schema: &DFSchema,
490 current: Timestamp,
491) -> Result<Option<Timestamp>, Error> {
492 use std::cmp::Ordering;
494
495 let cur_time_window = eval_phy_time_window_expr(phy_expr, df_schema, current)?;
496
497 let mut offset: i64 = 1;
499 let mut lower_bound = Some(current);
500 let upper_bound;
501 loop {
503 let Some(next_val) = current.value().checked_add(offset) else {
504 return Ok(None);
506 };
507
508 let next_time_probe = common_time::Timestamp::new(next_val, current.unit());
509
510 let next_time_window = eval_phy_time_window_expr(phy_expr, df_schema, next_time_probe)?;
511
512 match next_time_window.cmp(&cur_time_window) {
513 Ordering::Less => UnexpectedSnafu {
514 reason: format!(
515 "Unsupported time window expression, expect monotonic increasing for time window expression {phy_expr:?}"
516 ),
517 }
518 .fail()?,
519 Ordering::Equal => {
520 lower_bound = Some(next_time_probe);
521 }
522 Ordering::Greater => {
523 upper_bound = Some(next_time_probe);
524 break
525 }
526 }
527
528 let Some(new_offset) = offset.checked_mul(2) else {
529 return Ok(None);
531 };
532 offset = new_offset;
533 }
534
535 binary_search_expr(
538 lower_bound,
539 upper_bound,
540 cur_time_window,
541 phy_expr,
542 df_schema,
543 )
544 .map(Some)
545}
546
547fn binary_search_expr(
548 lower_bound: Option<Timestamp>,
549 upper_bound: Option<Timestamp>,
550 cur_time_window: Timestamp,
551 phy_expr: &PhysicalExprRef,
552 df_schema: &DFSchema,
553) -> Result<Timestamp, Error> {
554 ensure!(lower_bound.map(|v|v.unit()) == upper_bound.map(|v| v.unit()), UnexpectedSnafu {
555 reason: format!(" unit mismatch for time window expression {phy_expr:?}, found {lower_bound:?} and {upper_bound:?}"),
556 });
557
558 let output_unit = upper_bound
559 .context(UnexpectedSnafu {
560 reason: "should have lower bound",
561 })?
562 .unit();
563
564 let mut low = lower_bound
565 .context(UnexpectedSnafu {
566 reason: "should have lower bound",
567 })?
568 .value();
569 let mut high = upper_bound
570 .context(UnexpectedSnafu {
571 reason: "should have upper bound",
572 })?
573 .value();
574 while low < high {
575 let mid = (low + high) / 2;
576 let mid_probe = common_time::Timestamp::new(mid, output_unit);
577 let mid_time_window = eval_phy_time_window_expr(phy_expr, df_schema, mid_probe)?;
578
579 match mid_time_window.cmp(&cur_time_window) {
580 std::cmp::Ordering::Less => UnexpectedSnafu {
581 reason: format!("Binary search failed for time window expression {phy_expr:?}"),
582 }
583 .fail()?,
584 std::cmp::Ordering::Equal => low = mid + 1,
585 std::cmp::Ordering::Greater => high = mid,
586 }
587 }
588
589 let final_upper_bound_for_time_window = common_time::Timestamp::new(high, output_unit);
590 Ok(final_upper_bound_for_time_window)
591}
592
593fn eval_phy_time_window_expr(
595 phy: &PhysicalExprRef,
596 df_schema: &DFSchema,
597 input_value: Timestamp,
598) -> Result<Timestamp, Error> {
599 let schema_ty = df_schema.field(0).data_type();
600 let schema_cdt = ConcreteDataType::from_arrow_type(schema_ty);
601 let schema_unit = if let ConcreteDataType::Timestamp(ts) = schema_cdt {
602 ts.unit()
603 } else {
604 return UnexpectedSnafu {
605 reason: format!("Expect Timestamp, found {:?}", schema_cdt),
606 }
607 .fail();
608 };
609 let input_value = input_value
610 .convert_to(schema_unit)
611 .with_context(|| UnexpectedSnafu {
612 reason: format!("Failed to convert timestamp {input_value:?} to {schema_unit}"),
613 })?;
614 let ts_vector = match schema_unit {
615 TimeUnit::Second => {
616 TimestampSecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
617 }
618 TimeUnit::Millisecond => {
619 TimestampMillisecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
620 }
621 TimeUnit::Microsecond => {
622 TimestampMicrosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
623 }
624 TimeUnit::Nanosecond => {
625 TimestampNanosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
626 }
627 };
628
629 let rb = DfRecordBatch::try_new(df_schema.inner().clone(), vec![ts_vector.clone()])
630 .with_context(|_| ArrowSnafu {
631 context: format!("Failed to create record batch from {df_schema:?} and {ts_vector:?}"),
632 })?;
633
634 let eval_res = phy.evaluate(&rb).with_context(|_| DatafusionSnafu {
635 context: format!("Failed to evaluate physical expression {phy:?} on {rb:?}"),
636 })?;
637
638 if let Some(Some(ts)) = columnar_to_ts_vector(&eval_res)?.first() {
639 Ok(*ts)
640 } else {
641 UnexpectedSnafu {
642 reason: format!(
643 "Expected timestamp in expression {phy:?} but got {:?}",
644 eval_res
645 ),
646 }
647 .fail()?
648 }
649}
650
651fn to_phy_expr(
652 expr: &Expr,
653 df_schema: &DFSchema,
654 session: &SessionState,
655) -> Result<PhysicalExprRef, Error> {
656 let phy_planner = DefaultPhysicalPlanner::default();
657
658 let phy_expr: PhysicalExprRef = phy_planner
659 .create_physical_expr(expr, df_schema, session)
660 .with_context(|_e| DatafusionSnafu {
661 context: format!(
662 "Failed to create physical expression from {expr:?} using {df_schema:?}"
663 ),
664 })?;
665 Ok(phy_expr)
666}
667
668#[cfg(test)]
669mod test {
670 use datafusion_common::tree_node::TreeNode;
671 use pretty_assertions::assert_eq;
672 use session::context::QueryContext;
673
674 use super::*;
675 use crate::batching_mode::utils::{df_plan_to_sql, sql_to_df_plan, AddFilterRewriter};
676 use crate::test_utils::create_test_query_engine;
677
678 #[tokio::test]
679 async fn test_plan_time_window_lower_bound() {
680 use datafusion_expr::{col, lit};
681 let query_engine = create_test_query_engine();
682 let ctx = QueryContext::arc();
683
684 let testcases = [
685 (
687 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts GROUP BY ts;",
688 Timestamp::new(1740394109, TimeUnit::Second),
689 (
690 "ts".to_string(),
691 Some(Timestamp::new(1740394109000, TimeUnit::Millisecond)),
692 Some(Timestamp::new(1740394109001, TimeUnit::Millisecond)),
693 ),
694 r#"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"#
695 ),
696 (
698 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts GROUP BY time_window;",
699 Timestamp::new(1740394109, TimeUnit::Second),
700 (
701 "ts".to_string(),
702 Some(Timestamp::new(1740394080, TimeUnit::Second)),
703 Some(Timestamp::new(1740394140, TimeUnit::Second)),
704 ),
705 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
706 ),
707 (
709 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE number in (2, 3, 4) GROUP BY time_window;",
710 Timestamp::new(1740394109, TimeUnit::Second),
711 (
712 "ts".to_string(),
713 Some(Timestamp::new(1740394080, TimeUnit::Second)),
714 Some(Timestamp::new(1740394140, TimeUnit::Second)),
715 ),
716 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE numbers_with_ts.number IN (2, 3, 4) AND ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
717 ),
718 (
720 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE number BETWEEN 2 AND 4 GROUP BY time_window;",
721 Timestamp::new(1740394109, TimeUnit::Second),
722 (
723 "ts".to_string(),
724 Some(Timestamp::new(1740394080, TimeUnit::Second)),
725 Some(Timestamp::new(1740394140, TimeUnit::Second)),
726 ),
727 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE (numbers_with_ts.number BETWEEN 2 AND 4) AND ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
728 ),
729 (
731 "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;",
732 Timestamp::new(23, TimeUnit::Millisecond),
733 ("ts".to_string(), None, None),
734 "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;"
735 ),
736 (
738 "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
739 Timestamp::new(23, TimeUnit::Nanosecond),
740 (
741 "ts".to_string(),
742 Some(Timestamp::new(0, TimeUnit::Millisecond)),
743 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
744 ),
745 "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
746 ),
747 (
749 "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
750 Timestamp::new(0, TimeUnit::Nanosecond),
751 (
752 "ts".to_string(),
753 Some(Timestamp::new(0, TimeUnit::Millisecond)),
754 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
755 ),
756 "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
757 ),
758 (
760 "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
761 Timestamp::new(23_000_000, TimeUnit::Nanosecond),
762 (
763 "ts".to_string(),
764 Some(Timestamp::new(0, TimeUnit::Millisecond)),
765 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
766 ),
767 "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
768 ),
769 (
771 "SELECT sum(number) as sum_up, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
772 Timestamp::new(23, TimeUnit::Millisecond),
773 (
774 "ts".to_string(),
775 Some(Timestamp::new(0, TimeUnit::Millisecond)),
776 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
777 ),
778 "SELECT sum(numbers_with_ts.number) AS sum_up, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
779 ),
780 (
782 "SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number;",
783 Timestamp::new(23, TimeUnit::Millisecond),
784 (
785 "ts".to_string(),
786 Some(Timestamp::new(0, TimeUnit::Millisecond)),
787 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
788 ),
789 "SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number"
790 ),
791 (
793 "SELECT number, time_window FROM (SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number);",
794 Timestamp::new(23, TimeUnit::Millisecond),
795 (
796 "ts".to_string(),
797 Some(Timestamp::new(0, TimeUnit::Millisecond)),
798 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
799 ),
800 "SELECT numbers_with_ts.number, time_window FROM (SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number)"
801 ),
802 (
804 "with cte as (select number, date_bin('5 minutes', ts) as time_window from numbers_with_ts GROUP BY time_window, number) select number, time_window from cte;",
805 Timestamp::new(23, TimeUnit::Millisecond),
806 (
807 "ts".to_string(),
808 Some(Timestamp::new(0, TimeUnit::Millisecond)),
809 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
810 ),
811 "SELECT cte.number, cte.time_window FROM (SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number) AS cte"
812 ),
813 (
815 "SELECT sum(number), number, date_bin('5 minutes', ts) as time_window, bucket_name FROM (SELECT number, ts, case when number < 5 THEN 'bucket_0_5' when number >= 5 THEN 'bucket_5_inf' END as bucket_name FROM numbers_with_ts) GROUP BY number, time_window, bucket_name;",
816 Timestamp::new(23, TimeUnit::Millisecond),
817 (
818 "ts".to_string(),
819 Some(Timestamp::new(0, TimeUnit::Millisecond)),
820 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
821 ),
822 "SELECT sum(numbers_with_ts.number), numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window, bucket_name FROM (SELECT numbers_with_ts.number, numbers_with_ts.ts, CASE WHEN (numbers_with_ts.number < 5) THEN 'bucket_0_5' WHEN (numbers_with_ts.number >= 5) THEN 'bucket_5_inf' END AS bucket_name FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP)))) GROUP BY numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts), bucket_name"
823 ),
824 (
826 "SELECT sum(number), number, date_bin('5 minutes', ts) as time_window, bucket_name FROM (SELECT number, ts, case when number < 5 THEN 'bucket_0_5' when number >= 5 THEN 'bucket_5_inf' END as bucket_name FROM numbers_with_ts) as cte GROUP BY number, time_window, bucket_name;",
827 Timestamp::new(23, TimeUnit::Millisecond),
828 (
829 "ts".to_string(),
830 Some(Timestamp::new(0, TimeUnit::Millisecond)),
831 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
832 ),
833 "SELECT sum(cte.number), cte.number, date_bin('5 minutes', cte.ts) AS time_window, cte.bucket_name FROM (SELECT numbers_with_ts.number, numbers_with_ts.ts, CASE WHEN (numbers_with_ts.number < 5) THEN 'bucket_0_5' WHEN (numbers_with_ts.number >= 5) THEN 'bucket_5_inf' END AS bucket_name FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP)))) AS cte GROUP BY cte.number, date_bin('5 minutes', cte.ts), cte.bucket_name"
834 ),
835 ];
836
837 for (sql, current, expected, expected_unparsed) in testcases {
838 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, true)
839 .await
840 .unwrap();
841
842 let real =
843 find_plan_time_window_bound(&plan, current, ctx.clone(), query_engine.clone())
844 .await
845 .unwrap();
846 assert_eq!(expected, real);
847
848 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false)
849 .await
850 .unwrap();
851 let (col_name, lower, upper) = real;
852 let new_sql = if lower.is_some() {
853 let to_df_literal = |value| {
854 let value = Value::from(value);
855
856 value.try_to_scalar_value(&value.data_type()).unwrap()
857 };
858 let lower = to_df_literal(lower.unwrap());
859 let upper = to_df_literal(upper.unwrap());
860 let expr = col(&col_name)
861 .gt_eq(lit(lower))
862 .and(col(&col_name).lt_eq(lit(upper)));
863 let mut add_filter = AddFilterRewriter::new(expr);
864 let plan = plan.rewrite(&mut add_filter).unwrap().data;
865 df_plan_to_sql(&plan).unwrap()
866 } else {
867 sql.to_string()
868 };
869 assert_eq!(expected_unparsed, new_sql);
870 }
871 }
872}