1use std::collections::BTreeSet;
19use std::sync::Arc;
20
21use api::helper::pb_value_to_value_ref;
22use arrow::array::{
23 TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
24 TimestampSecondArray,
25};
26use catalog::CatalogManagerRef;
27use common_error::ext::BoxedError;
28use common_recordbatch::DfRecordBatch;
29use common_telemetry::warn;
30use common_time::timestamp::TimeUnit;
31use common_time::Timestamp;
32use datafusion::error::Result as DfResult;
33use datafusion::execution::SessionState;
34use datafusion::logical_expr::Expr;
35use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
36use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
37use datafusion_common::{DFSchema, TableReference};
38use datafusion_expr::{ColumnarValue, LogicalPlan};
39use datafusion_physical_expr::PhysicalExprRef;
40use datatypes::prelude::{ConcreteDataType, DataType};
41use datatypes::schema::TIME_INDEX_KEY;
42use datatypes::value::Value;
43use datatypes::vectors::{
44 TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
45 TimestampSecondVector, Vector,
46};
47use itertools::Itertools;
48use session::context::QueryContextRef;
49use snafu::{ensure, OptionExt, ResultExt};
50
51use crate::adapter::util::from_proto_to_data_type;
52use crate::error::{
53 ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, PlanSnafu, UnexpectedSnafu,
54};
55use crate::expr::error::DataTypeSnafu;
56use crate::Error;
57
58const DEFAULT_TEST_TIMESTAMP: Timestamp = Timestamp::new_second(17_0000_0000);
60
61#[derive(Debug, Clone)]
71pub struct TimeWindowExpr {
72 phy_expr: PhysicalExprRef,
73 pub column_name: String,
74 logical_expr: Expr,
75 df_schema: DFSchema,
76 eval_time_window_size: Option<std::time::Duration>,
77}
78
79impl std::fmt::Display for TimeWindowExpr {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 f.debug_struct("TimeWindowExpr")
82 .field("phy_expr", &self.phy_expr.to_string())
83 .field("column_name", &self.column_name)
84 .field("logical_expr", &self.logical_expr.to_string())
85 .field("df_schema", &self.df_schema)
86 .finish()
87 }
88}
89
90impl TimeWindowExpr {
91 pub fn time_window_size(&self) -> &Option<std::time::Duration> {
93 &self.eval_time_window_size
94 }
95
96 pub fn from_expr(
97 expr: &Expr,
98 column_name: &str,
99 df_schema: &DFSchema,
100 session: &SessionState,
101 ) -> Result<Self, Error> {
102 let phy_expr: PhysicalExprRef = to_phy_expr(expr, df_schema, session)?;
103 let mut zelf = Self {
104 phy_expr,
105 column_name: column_name.to_string(),
106 logical_expr: expr.clone(),
107 df_schema: df_schema.clone(),
108 eval_time_window_size: None,
109 };
110 let test_ts = DEFAULT_TEST_TIMESTAMP;
111 let (l, u) = zelf.eval(test_ts)?;
112 let time_window_size = match (l, u) {
113 (Some(l), Some(u)) => u.sub(&l).map(|r| r.to_std()).transpose().map_err(|_| {
114 UnexpectedSnafu {
115 reason: format!(
116 "Expect upper bound older than lower bound, found upper={u:?} and lower={l:?}"
117 ),
118 }
119 .build()
120 })?,
121 _ => None,
122 };
123 zelf.eval_time_window_size = time_window_size;
124 Ok(zelf)
125 }
126
127 pub fn eval(
128 &self,
129 current: Timestamp,
130 ) -> Result<(Option<Timestamp>, Option<Timestamp>), Error> {
131 let lower_bound =
132 calc_expr_time_window_lower_bound(&self.phy_expr, &self.df_schema, current)?;
133 let upper_bound =
134 probe_expr_time_window_upper_bound(&self.phy_expr, &self.df_schema, current)?;
135 Ok((lower_bound, upper_bound))
136 }
137
138 pub async fn handle_rows(
142 &self,
143 rows_list: Vec<api::v1::Rows>,
144 ) -> Result<BTreeSet<Timestamp>, Error> {
145 let mut time_windows = BTreeSet::new();
146
147 for rows in rows_list {
148 let ts_col_index = rows
151 .schema
152 .iter()
153 .map(|col| col.column_name.clone())
154 .position(|name| name == self.column_name);
155 let Some(ts_col_index) = ts_col_index else {
156 warn!("can't found time index column in schema: {:?}", rows.schema);
157 continue;
158 };
159 let col_schema = &rows.schema[ts_col_index];
160 let cdt = from_proto_to_data_type(col_schema)?;
161
162 let mut vector = cdt.create_mutable_vector(rows.rows.len());
163 for row in rows.rows {
164 let value = pb_value_to_value_ref(&row.values[ts_col_index], &None);
165 vector.try_push_value_ref(value).context(DataTypeSnafu {
166 msg: "Failed to convert rows to columns",
167 })?;
168 }
169 let vector = vector.to_vector();
170
171 let df_schema = create_df_schema_for_ts_column(&self.column_name, cdt)?;
172
173 let rb =
174 DfRecordBatch::try_new(df_schema.inner().clone(), vec![vector.to_arrow_array()])
175 .with_context(|_e| ArrowSnafu {
176 context: format!(
177 "Failed to create record batch from {df_schema:?} and {vector:?}"
178 ),
179 })?;
180
181 let eval_res = self
182 .phy_expr
183 .evaluate(&rb)
184 .with_context(|_| DatafusionSnafu {
185 context: format!(
186 "Failed to evaluate physical expression {:?} on {rb:?}",
187 self.phy_expr
188 ),
189 })?;
190
191 let res = columnar_to_ts_vector(&eval_res)?;
192
193 for ts in res.into_iter().flatten() {
194 time_windows.insert(ts);
195 }
196 }
197
198 Ok(time_windows)
199 }
200}
201
202fn create_df_schema_for_ts_column(name: &str, cdt: ConcreteDataType) -> Result<DFSchema, Error> {
203 let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
204 name,
205 cdt.as_arrow_type(),
206 false,
207 )]));
208
209 let df_schema = DFSchema::from_field_specific_qualified_schema(
210 vec![Some(TableReference::bare("TimeIndexOnlyTable"))],
211 &arrow_schema,
212 )
213 .with_context(|_e| DatafusionSnafu {
214 context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
215 })?;
216
217 Ok(df_schema)
218}
219
220fn columnar_to_ts_vector(columnar: &ColumnarValue) -> Result<Vec<Option<Timestamp>>, Error> {
222 let val = match columnar {
223 datafusion_expr::ColumnarValue::Array(array) => {
224 let ty = array.data_type();
225 let ty = ConcreteDataType::from_arrow_type(ty);
226 let time_unit = if let ConcreteDataType::Timestamp(ty) = ty {
227 ty.unit()
228 } else {
229 return UnexpectedSnafu {
230 reason: format!("Non-timestamp type: {ty:?}"),
231 }
232 .fail();
233 };
234
235 match time_unit {
236 TimeUnit::Second => array
237 .as_ref()
238 .as_any()
239 .downcast_ref::<TimestampSecondArray>()
240 .with_context(|| PlanSnafu {
241 reason: format!("Failed to create vector from arrow array {array:?}"),
242 })?
243 .values()
244 .iter()
245 .map(|d| Some(Timestamp::new(*d, time_unit)))
246 .collect_vec(),
247 TimeUnit::Millisecond => array
248 .as_ref()
249 .as_any()
250 .downcast_ref::<TimestampMillisecondArray>()
251 .with_context(|| PlanSnafu {
252 reason: format!("Failed to create vector from arrow array {array:?}"),
253 })?
254 .values()
255 .iter()
256 .map(|d| Some(Timestamp::new(*d, time_unit)))
257 .collect_vec(),
258 TimeUnit::Microsecond => array
259 .as_ref()
260 .as_any()
261 .downcast_ref::<TimestampMicrosecondArray>()
262 .with_context(|| PlanSnafu {
263 reason: format!("Failed to create vector from arrow array {array:?}"),
264 })?
265 .values()
266 .iter()
267 .map(|d| Some(Timestamp::new(*d, time_unit)))
268 .collect_vec(),
269 TimeUnit::Nanosecond => array
270 .as_ref()
271 .as_any()
272 .downcast_ref::<TimestampNanosecondArray>()
273 .with_context(|| PlanSnafu {
274 reason: format!("Failed to create vector from arrow array {array:?}"),
275 })?
276 .values()
277 .iter()
278 .map(|d| Some(Timestamp::new(*d, time_unit)))
279 .collect_vec(),
280 }
281 }
282 datafusion_expr::ColumnarValue::Scalar(scalar) => {
283 let value = Value::try_from(scalar.clone()).with_context(|_| DatatypesSnafu {
284 extra: format!("Failed to convert scalar {scalar:?} to value"),
285 })?;
286 let ts = value.as_timestamp().context(UnexpectedSnafu {
287 reason: format!("Expect Timestamp, found {:?}", value),
288 })?;
289 vec![Some(ts)]
290 }
291 };
292 Ok(val)
293}
294
295pub async fn find_time_window_expr(
302 plan: &LogicalPlan,
303 catalog_man: CatalogManagerRef,
304 query_ctx: QueryContextRef,
305) -> Result<(String, Option<datafusion_expr::Expr>, TimeUnit, DFSchema), Error> {
306 let mut table_name = None;
309
310 plan.apply(|plan| {
312 let LogicalPlan::TableScan(table_scan) = plan else {
313 return Ok(TreeNodeRecursion::Continue);
314 };
315 table_name = Some(table_scan.table_name.clone());
316 Ok(TreeNodeRecursion::Stop)
317 })
318 .with_context(|_| DatafusionSnafu {
319 context: format!("Can't find table source in plan {plan:?}"),
320 })?;
321 let Some(table_name) = table_name else {
322 UnexpectedSnafu {
323 reason: format!("Can't find table source in plan {plan:?}"),
324 }
325 .fail()?
326 };
327
328 let current_schema = query_ctx.current_schema();
329
330 let catalog_name = table_name.catalog().unwrap_or(query_ctx.current_catalog());
331 let schema_name = table_name.schema().unwrap_or(¤t_schema);
332 let table_name = table_name.table();
333
334 let Some(table_ref) = catalog_man
335 .table(catalog_name, schema_name, table_name, Some(&query_ctx))
336 .await
337 .map_err(BoxedError::new)
338 .context(ExternalSnafu)?
339 else {
340 UnexpectedSnafu {
341 reason: format!(
342 "Can't find table {table_name:?} in catalog {catalog_name:?}/{schema_name:?}"
343 ),
344 }
345 .fail()?
346 };
347
348 let schema = &table_ref.table_info().meta.schema;
349
350 let ts_index = schema.timestamp_column().with_context(|| UnexpectedSnafu {
351 reason: format!("Can't find timestamp column in table {table_name:?}"),
352 })?;
353
354 let ts_col_name = ts_index.name.clone();
355
356 let expected_time_unit = ts_index.data_type.as_timestamp().with_context(|| UnexpectedSnafu {
357 reason: format!(
358 "Expected timestamp column {ts_col_name:?} in table {table_name:?} to be timestamp, but got {ts_index:?}"
359 ),
360 })?.unit();
361
362 let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
363 ts_col_name.clone(),
364 ts_index.data_type.as_arrow_type(),
365 false,
366 )]));
367
368 let df_schema = DFSchema::from_field_specific_qualified_schema(
369 vec![Some(TableReference::bare(table_name))],
370 &arrow_schema,
371 )
372 .with_context(|_e| DatafusionSnafu {
373 context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
374 })?;
375
376 let mut aggr_expr = None;
378 let mut time_window_expr: Option<Expr> = None;
379
380 let find_inner_aggr_expr = |plan: &LogicalPlan| {
381 if let LogicalPlan::Aggregate(aggregate) = plan {
382 aggr_expr = Some(aggregate.clone());
383 };
384
385 Ok(TreeNodeRecursion::Continue)
386 };
387 plan.apply(find_inner_aggr_expr)
388 .with_context(|_| DatafusionSnafu {
389 context: format!("Can't find aggr expr in plan {plan:?}"),
390 })?;
391
392 if let Some(aggregate) = aggr_expr {
393 for group_expr in &aggregate.group_expr {
394 let refs = group_expr.column_refs();
395 if refs.len() != 1 {
396 continue;
397 }
398 let ref_col = refs.iter().next().unwrap();
399
400 let index = aggregate.input.schema().maybe_index_of_column(ref_col);
401 let Some(index) = index else {
402 continue;
403 };
404 let field = aggregate.input.schema().field(index);
405
406 let is_time_index =
408 field.metadata().get(TIME_INDEX_KEY).map(|s| s.as_str()) == Some("true");
409
410 if is_time_index {
411 let rewrite_column = group_expr.clone();
412 let rewritten = rewrite_column
413 .rewrite(&mut RewriteColumn {
414 table_name: table_name.to_string(),
415 })
416 .with_context(|_| DatafusionSnafu {
417 context: format!("Rewrite expr failed, expr={:?}", group_expr),
418 })?
419 .data;
420 struct RewriteColumn {
421 table_name: String,
422 }
423
424 impl TreeNodeRewriter for RewriteColumn {
425 type Node = Expr;
426 fn f_down(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
427 let Expr::Column(mut column) = node else {
428 return Ok(Transformed::no(node));
429 };
430
431 column.relation = Some(TableReference::bare(self.table_name.clone()));
432
433 Ok(Transformed::yes(Expr::Column(column)))
434 }
435 }
436
437 time_window_expr = Some(rewritten);
438 break;
439 }
440 }
441 Ok((ts_col_name, time_window_expr, expected_time_unit, df_schema))
442 } else {
443 Ok((ts_col_name, None, expected_time_unit, df_schema))
445 }
446}
447
448#[cfg(test)]
460pub async fn find_plan_time_window_bound(
461 plan: &LogicalPlan,
462 current: Timestamp,
463 query_ctx: QueryContextRef,
464 engine: query::QueryEngineRef,
465) -> Result<(String, Option<Timestamp>, Option<Timestamp>), Error> {
466 let catalog_man = engine.engine_state().catalog_manager();
468
469 let (ts_col_name, time_window_expr, expected_time_unit, df_schema) =
470 find_time_window_expr(plan, catalog_man.clone(), query_ctx).await?;
471 let new_current = current
473 .convert_to(expected_time_unit)
474 .with_context(|| UnexpectedSnafu {
475 reason: format!("Failed to cast current timestamp {current:?} to {expected_time_unit}"),
476 })?;
477
478 if let Some(time_window_expr) = time_window_expr {
480 let phy_expr = to_phy_expr(
481 &time_window_expr,
482 &df_schema,
483 &engine.engine_state().session_state(),
484 )?;
485 let lower_bound = calc_expr_time_window_lower_bound(&phy_expr, &df_schema, new_current)?;
486 let upper_bound = probe_expr_time_window_upper_bound(&phy_expr, &df_schema, new_current)?;
487 Ok((ts_col_name, lower_bound, upper_bound))
488 } else {
489 Ok((ts_col_name, None, None))
490 }
491}
492
493fn calc_expr_time_window_lower_bound(
502 phy_expr: &PhysicalExprRef,
503 df_schema: &DFSchema,
504 current: Timestamp,
505) -> Result<Option<Timestamp>, Error> {
506 let cur_time_window = eval_phy_time_window_expr(phy_expr, df_schema, current)?;
507 let input_time_unit = cur_time_window.unit();
508 Ok(cur_time_window.convert_to(input_time_unit))
509}
510
511fn probe_expr_time_window_upper_bound(
513 phy_expr: &PhysicalExprRef,
514 df_schema: &DFSchema,
515 current: Timestamp,
516) -> Result<Option<Timestamp>, Error> {
517 use std::cmp::Ordering;
519
520 let cur_time_window = eval_phy_time_window_expr(phy_expr, df_schema, current)?;
521
522 let mut offset: i64 = 1;
524 let mut lower_bound = Some(current);
525 let upper_bound;
526 loop {
528 let Some(next_val) = current.value().checked_add(offset) else {
529 return Ok(None);
531 };
532
533 let next_time_probe = common_time::Timestamp::new(next_val, current.unit());
534
535 let next_time_window = eval_phy_time_window_expr(phy_expr, df_schema, next_time_probe)?;
536
537 match next_time_window.cmp(&cur_time_window) {
538 Ordering::Less => UnexpectedSnafu {
539 reason: format!(
540 "Unsupported time window expression, expect monotonic increasing for time window expression {phy_expr:?}"
541 ),
542 }
543 .fail()?,
544 Ordering::Equal => {
545 lower_bound = Some(next_time_probe);
546 }
547 Ordering::Greater => {
548 upper_bound = Some(next_time_probe);
549 break
550 }
551 }
552
553 let Some(new_offset) = offset.checked_mul(2) else {
554 return Ok(None);
556 };
557 offset = new_offset;
558 }
559
560 binary_search_expr(
563 lower_bound,
564 upper_bound,
565 cur_time_window,
566 phy_expr,
567 df_schema,
568 )
569 .map(Some)
570}
571
572fn binary_search_expr(
573 lower_bound: Option<Timestamp>,
574 upper_bound: Option<Timestamp>,
575 cur_time_window: Timestamp,
576 phy_expr: &PhysicalExprRef,
577 df_schema: &DFSchema,
578) -> Result<Timestamp, Error> {
579 ensure!(lower_bound.map(|v|v.unit()) == upper_bound.map(|v| v.unit()), UnexpectedSnafu {
580 reason: format!(" unit mismatch for time window expression {phy_expr:?}, found {lower_bound:?} and {upper_bound:?}"),
581 });
582
583 let output_unit = upper_bound
584 .context(UnexpectedSnafu {
585 reason: "should have lower bound",
586 })?
587 .unit();
588
589 let mut low = lower_bound
590 .context(UnexpectedSnafu {
591 reason: "should have lower bound",
592 })?
593 .value();
594 let mut high = upper_bound
595 .context(UnexpectedSnafu {
596 reason: "should have upper bound",
597 })?
598 .value();
599 while low < high {
600 let mid = (low + high) / 2;
601 let mid_probe = common_time::Timestamp::new(mid, output_unit);
602 let mid_time_window = eval_phy_time_window_expr(phy_expr, df_schema, mid_probe)?;
603
604 match mid_time_window.cmp(&cur_time_window) {
605 std::cmp::Ordering::Less => UnexpectedSnafu {
606 reason: format!("Binary search failed for time window expression {phy_expr:?}"),
607 }
608 .fail()?,
609 std::cmp::Ordering::Equal => low = mid + 1,
610 std::cmp::Ordering::Greater => high = mid,
611 }
612 }
613
614 let final_upper_bound_for_time_window = common_time::Timestamp::new(high, output_unit);
615 Ok(final_upper_bound_for_time_window)
616}
617
618fn eval_phy_time_window_expr(
620 phy: &PhysicalExprRef,
621 df_schema: &DFSchema,
622 input_value: Timestamp,
623) -> Result<Timestamp, Error> {
624 let schema_ty = df_schema.field(0).data_type();
625 let schema_cdt = ConcreteDataType::from_arrow_type(schema_ty);
626 let schema_unit = if let ConcreteDataType::Timestamp(ts) = schema_cdt {
627 ts.unit()
628 } else {
629 return UnexpectedSnafu {
630 reason: format!("Expect Timestamp, found {:?}", schema_cdt),
631 }
632 .fail();
633 };
634 let input_value = input_value
635 .convert_to(schema_unit)
636 .with_context(|| UnexpectedSnafu {
637 reason: format!("Failed to convert timestamp {input_value:?} to {schema_unit}"),
638 })?;
639 let ts_vector = match schema_unit {
640 TimeUnit::Second => {
641 TimestampSecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
642 }
643 TimeUnit::Millisecond => {
644 TimestampMillisecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
645 }
646 TimeUnit::Microsecond => {
647 TimestampMicrosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
648 }
649 TimeUnit::Nanosecond => {
650 TimestampNanosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
651 }
652 };
653
654 let rb = DfRecordBatch::try_new(df_schema.inner().clone(), vec![ts_vector.clone()])
655 .with_context(|_| ArrowSnafu {
656 context: format!("Failed to create record batch from {df_schema:?} and {ts_vector:?}"),
657 })?;
658
659 let eval_res = phy.evaluate(&rb).with_context(|_| DatafusionSnafu {
660 context: format!("Failed to evaluate physical expression {phy:?} on {rb:?}"),
661 })?;
662
663 if let Some(Some(ts)) = columnar_to_ts_vector(&eval_res)?.first() {
664 Ok(*ts)
665 } else {
666 UnexpectedSnafu {
667 reason: format!(
668 "Expected timestamp in expression {phy:?} but got {:?}",
669 eval_res
670 ),
671 }
672 .fail()?
673 }
674}
675
676fn to_phy_expr(
677 expr: &Expr,
678 df_schema: &DFSchema,
679 session: &SessionState,
680) -> Result<PhysicalExprRef, Error> {
681 let phy_planner = DefaultPhysicalPlanner::default();
682
683 let phy_expr: PhysicalExprRef = phy_planner
684 .create_physical_expr(expr, df_schema, session)
685 .with_context(|_e| DatafusionSnafu {
686 context: format!(
687 "Failed to create physical expression from {expr:?} using {df_schema:?}"
688 ),
689 })?;
690 Ok(phy_expr)
691}
692
693#[cfg(test)]
694mod test {
695 use datafusion_common::tree_node::TreeNode;
696 use pretty_assertions::assert_eq;
697 use session::context::QueryContext;
698
699 use super::*;
700 use crate::batching_mode::utils::{df_plan_to_sql, sql_to_df_plan, AddFilterRewriter};
701 use crate::test_utils::create_test_query_engine;
702
703 #[tokio::test]
704 async fn test_plan_time_window_lower_bound() {
705 use datafusion_expr::{col, lit};
706 let query_engine = create_test_query_engine();
707 let ctx = QueryContext::arc();
708
709 let testcases = [
710 (
712 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts GROUP BY ts;",
713 Timestamp::new(1740394109, TimeUnit::Second),
714 (
715 "ts".to_string(),
716 Some(Timestamp::new(1740394109000, TimeUnit::Millisecond)),
717 Some(Timestamp::new(1740394109001, TimeUnit::Millisecond)),
718 ),
719 r#"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"#
720 ),
721 (
723 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts GROUP BY time_window;",
724 Timestamp::new(1740394109, TimeUnit::Second),
725 (
726 "ts".to_string(),
727 Some(Timestamp::new(1740394080, TimeUnit::Second)),
728 Some(Timestamp::new(1740394140, TimeUnit::Second)),
729 ),
730 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
731 ),
732 (
734 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE number in (2, 3, 4) GROUP BY time_window;",
735 Timestamp::new(1740394109, TimeUnit::Second),
736 (
737 "ts".to_string(),
738 Some(Timestamp::new(1740394080, TimeUnit::Second)),
739 Some(Timestamp::new(1740394140, TimeUnit::Second)),
740 ),
741 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE numbers_with_ts.number IN (2, 3, 4) AND ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
742 ),
743 (
745 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE number BETWEEN 2 AND 4 GROUP BY time_window;",
746 Timestamp::new(1740394109, TimeUnit::Second),
747 (
748 "ts".to_string(),
749 Some(Timestamp::new(1740394080, TimeUnit::Second)),
750 Some(Timestamp::new(1740394140, TimeUnit::Second)),
751 ),
752 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE (numbers_with_ts.number BETWEEN 2 AND 4) AND ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
753 ),
754 (
756 "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;",
757 Timestamp::new(23, TimeUnit::Millisecond),
758 ("ts".to_string(), None, None),
759 "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;"
760 ),
761 (
763 "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
764 Timestamp::new(23, TimeUnit::Nanosecond),
765 (
766 "ts".to_string(),
767 Some(Timestamp::new(0, TimeUnit::Millisecond)),
768 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
769 ),
770 "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
771 ),
772 (
774 "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
775 Timestamp::new(0, TimeUnit::Nanosecond),
776 (
777 "ts".to_string(),
778 Some(Timestamp::new(0, TimeUnit::Millisecond)),
779 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
780 ),
781 "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
782 ),
783 (
785 "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
786 Timestamp::new(23_000_000, TimeUnit::Nanosecond),
787 (
788 "ts".to_string(),
789 Some(Timestamp::new(0, TimeUnit::Millisecond)),
790 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
791 ),
792 "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
793 ),
794 (
796 "SELECT sum(number) as sum_up, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
797 Timestamp::new(23, TimeUnit::Millisecond),
798 (
799 "ts".to_string(),
800 Some(Timestamp::new(0, TimeUnit::Millisecond)),
801 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
802 ),
803 "SELECT sum(numbers_with_ts.number) AS sum_up, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)"
804 ),
805 (
807 "SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number;",
808 Timestamp::new(23, TimeUnit::Millisecond),
809 (
810 "ts".to_string(),
811 Some(Timestamp::new(0, TimeUnit::Millisecond)),
812 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
813 ),
814 "SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number"
815 ),
816 (
818 "SELECT number, time_window FROM (SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number);",
819 Timestamp::new(23, TimeUnit::Millisecond),
820 (
821 "ts".to_string(),
822 Some(Timestamp::new(0, TimeUnit::Millisecond)),
823 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
824 ),
825 "SELECT numbers_with_ts.number, time_window FROM (SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number)"
826 ),
827 (
829 "with cte as (select number, date_bin('5 minutes', ts) as time_window from numbers_with_ts GROUP BY time_window, number) select number, time_window from cte;",
830 Timestamp::new(23, TimeUnit::Millisecond),
831 (
832 "ts".to_string(),
833 Some(Timestamp::new(0, TimeUnit::Millisecond)),
834 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
835 ),
836 "SELECT cte.number, cte.time_window FROM (SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number) AS cte"
837 ),
838 (
840 "SELECT sum(number), number, date_bin('5 minutes', ts) as time_window, bucket_name FROM (SELECT number, ts, case when number < 5 THEN 'bucket_0_5' when number >= 5 THEN 'bucket_5_inf' END as bucket_name FROM numbers_with_ts) GROUP BY number, time_window, bucket_name;",
841 Timestamp::new(23, TimeUnit::Millisecond),
842 (
843 "ts".to_string(),
844 Some(Timestamp::new(0, TimeUnit::Millisecond)),
845 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
846 ),
847 "SELECT sum(numbers_with_ts.number), numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window, bucket_name FROM (SELECT numbers_with_ts.number, numbers_with_ts.ts, CASE WHEN (numbers_with_ts.number < 5) THEN 'bucket_0_5' WHEN (numbers_with_ts.number >= 5) THEN 'bucket_5_inf' END AS bucket_name FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP)))) GROUP BY numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts), bucket_name"
848 ),
849 (
851 "SELECT sum(number), number, date_bin('5 minutes', ts) as time_window, bucket_name FROM (SELECT number, ts, case when number < 5 THEN 'bucket_0_5' when number >= 5 THEN 'bucket_5_inf' END as bucket_name FROM numbers_with_ts) as cte GROUP BY number, time_window, bucket_name;",
852 Timestamp::new(23, TimeUnit::Millisecond),
853 (
854 "ts".to_string(),
855 Some(Timestamp::new(0, TimeUnit::Millisecond)),
856 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
857 ),
858 "SELECT sum(cte.number), cte.number, date_bin('5 minutes', cte.ts) AS time_window, cte.bucket_name FROM (SELECT numbers_with_ts.number, numbers_with_ts.ts, CASE WHEN (numbers_with_ts.number < 5) THEN 'bucket_0_5' WHEN (numbers_with_ts.number >= 5) THEN 'bucket_5_inf' END AS bucket_name FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP)))) AS cte GROUP BY cte.number, date_bin('5 minutes', cte.ts), cte.bucket_name"
859 ),
860 ];
861
862 for (sql, current, expected, expected_unparsed) in testcases {
863 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, true)
864 .await
865 .unwrap();
866
867 let real =
868 find_plan_time_window_bound(&plan, current, ctx.clone(), query_engine.clone())
869 .await
870 .unwrap();
871 assert_eq!(expected, real);
872
873 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false)
874 .await
875 .unwrap();
876 let (col_name, lower, upper) = real;
877 let new_sql = if lower.is_some() {
878 let to_df_literal = |value| {
879 let value = Value::from(value);
880
881 value.try_to_scalar_value(&value.data_type()).unwrap()
882 };
883 let lower = to_df_literal(lower.unwrap());
884 let upper = to_df_literal(upper.unwrap());
885 let expr = col(&col_name)
886 .gt_eq(lit(lower))
887 .and(col(&col_name).lt_eq(lit(upper)));
888 let mut add_filter = AddFilterRewriter::new(expr);
889 let plan = plan.rewrite(&mut add_filter).unwrap().data;
890 df_plan_to_sql(&plan).unwrap()
891 } else {
892 sql.to_string()
893 };
894 assert_eq!(expected_unparsed, new_sql);
895 }
896 }
897}