1use std::collections::BTreeSet;
19use std::sync::Arc;
20
21use api::helper::pb_value_to_value_ref;
22use arrow::array::{
23 TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
24 TimestampSecondArray,
25};
26use catalog::CatalogManagerRef;
27use common_error::ext::BoxedError;
28use common_recordbatch::DfRecordBatch;
29use common_telemetry::warn;
30use common_time::Timestamp;
31use common_time::timestamp::TimeUnit;
32use datafusion::error::Result as DfResult;
33use datafusion::execution::SessionState;
34use datafusion::logical_expr::Expr;
35use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
36use datafusion_common::tree_node::{
37 Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor,
38};
39use datafusion_common::{DFSchema, TableReference};
40use datafusion_expr::{ColumnarValue, LogicalPlan};
41use datafusion_physical_expr::PhysicalExprRef;
42use datatypes::prelude::{ConcreteDataType, DataType};
43use datatypes::schema::TIME_INDEX_KEY;
44use datatypes::value::Value;
45use datatypes::vectors::{
46 TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
47 TimestampSecondVector, Vector,
48};
49use itertools::Itertools;
50use session::context::QueryContextRef;
51use snafu::{OptionExt, ResultExt, ensure};
52
53use crate::Error;
54use crate::adapter::util::from_proto_to_data_type;
55use crate::error::{
56 ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, PlanSnafu, TimeSnafu,
57 UnexpectedSnafu,
58};
59use crate::expr::error::DataTypeSnafu;
60
61const DEFAULT_TEST_TIMESTAMP: Timestamp = Timestamp::new_second(17_0000_0000);
63
64#[derive(Default)]
65struct TimeWindowPlanShape {
66 table_scan_count: usize,
67 aggregate_count: usize,
68 has_unsupported_pruning_node: bool,
69}
70
71impl TimeWindowPlanShape {
72 fn should_skip_time_window_expr(&self) -> bool {
73 self.has_unsupported_pruning_node || self.table_scan_count != 1 || self.aggregate_count > 1
74 }
75
76 fn should_stop_inspection(&self) -> bool {
77 self.has_unsupported_pruning_node || self.table_scan_count > 1 || self.aggregate_count > 1
82 }
83}
84
85impl TreeNodeVisitor<'_> for TimeWindowPlanShape {
86 type Node = LogicalPlan;
87
88 fn f_down(&mut self, node: &Self::Node) -> DfResult<TreeNodeRecursion> {
89 match node {
90 LogicalPlan::TableScan(_) => {
91 self.table_scan_count += 1;
92 }
93 LogicalPlan::Aggregate(_) => {
94 self.aggregate_count += 1;
95 }
96 LogicalPlan::Join(_)
101 | LogicalPlan::Window(_)
102 | LogicalPlan::Union(_)
103 | LogicalPlan::Distinct(_)
104 | LogicalPlan::Limit(_)
105 | LogicalPlan::Sort(_)
106 | LogicalPlan::Extension(_)
107 | LogicalPlan::Dml(_)
108 | LogicalPlan::Ddl(_)
109 | LogicalPlan::Unnest(_)
110 | LogicalPlan::RecursiveQuery(_) => {
111 self.has_unsupported_pruning_node = true;
112 }
113 _ => {}
114 }
115
116 if self.should_stop_inspection() {
121 Ok(TreeNodeRecursion::Stop)
122 } else {
123 Ok(TreeNodeRecursion::Continue)
124 }
125 }
126}
127
128fn inspect_time_window_plan_shape(plan: &LogicalPlan) -> DfResult<TimeWindowPlanShape> {
129 let mut shape = TimeWindowPlanShape::default();
130 plan.visit_with_subqueries(&mut shape)?;
131 Ok(shape)
132}
133
134fn should_skip_time_window_expr(plan: &LogicalPlan) -> bool {
135 let Ok(shape) = inspect_time_window_plan_shape(plan) else {
136 return true;
137 };
138
139 shape.should_skip_time_window_expr()
140}
141
142#[derive(Debug, Clone)]
152pub struct TimeWindowExpr {
153 phy_expr: PhysicalExprRef,
154 pub column_name: String,
155 logical_expr: Expr,
156 df_schema: DFSchema,
157 eval_time_window_size: Option<std::time::Duration>,
158 eval_time_original: Option<Timestamp>,
159}
160
161impl std::fmt::Display for TimeWindowExpr {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 f.debug_struct("TimeWindowExpr")
164 .field("phy_expr", &self.phy_expr.to_string())
165 .field("column_name", &self.column_name)
166 .field("logical_expr", &self.logical_expr.to_string())
167 .field("df_schema", &self.df_schema)
168 .finish()
169 }
170}
171
172impl TimeWindowExpr {
173 pub fn time_window_size(&self) -> &Option<std::time::Duration> {
175 &self.eval_time_window_size
176 }
177
178 pub fn from_expr(
179 expr: &Expr,
180 column_name: &str,
181 df_schema: &DFSchema,
182 session: &SessionState,
183 ) -> Result<Self, Error> {
184 let phy_expr: PhysicalExprRef = to_phy_expr(expr, df_schema, session)?;
185 let mut zelf = Self {
186 phy_expr,
187 column_name: column_name.to_string(),
188 logical_expr: expr.clone(),
189 df_schema: df_schema.clone(),
190 eval_time_window_size: None,
191 eval_time_original: None,
192 };
193 let test_ts = DEFAULT_TEST_TIMESTAMP;
194 let (lower, upper) = zelf.eval(test_ts)?;
195 let time_window_size = match (lower, upper) {
196 (Some(l), Some(u)) => u.sub(&l).map(|r| r.to_std()).transpose().map_err(|_| {
197 UnexpectedSnafu {
198 reason: format!(
199 "Expect upper bound older than lower bound, found upper={u:?} and lower={l:?}"
200 ),
201 }
202 .build()
203 })?,
204 _ => None,
205 };
206 zelf.eval_time_window_size = time_window_size;
207 zelf.eval_time_original = lower;
208
209 Ok(zelf)
210 }
211
212 pub fn eval(
214 &self,
215 current: Timestamp,
216 ) -> Result<(Option<Timestamp>, Option<Timestamp>), Error> {
217 fn compute_distance(time_diff_ns: i64, stride_ns: i64) -> i64 {
218 if stride_ns == 0 {
219 return time_diff_ns;
220 }
221 let time_delta = time_diff_ns - (time_diff_ns % stride_ns);
223
224 if time_diff_ns < 0 && time_delta != time_diff_ns {
225 time_delta - stride_ns
228 } else {
229 time_delta
230 }
231 }
232
233 if let (Some(original), Some(window_size)) =
236 (self.eval_time_original, self.eval_time_window_size)
237 {
238 let time_diff_ns = current.sub(&original).and_then(|s|s.num_nanoseconds()).with_context(||UnexpectedSnafu {
240 reason: format!(
241 "Failed to compute time difference between current {current:?} and original {original:?}"
242 ),
243 })?;
244
245 let window_size_ns = window_size.as_nanos() as i64;
246
247 let distance_ns = compute_distance(time_diff_ns, window_size_ns);
248
249 let lower_bound = if distance_ns >= 0 {
250 original.add_duration(std::time::Duration::from_nanos(distance_ns as u64))
251 } else {
252 original.sub_duration(std::time::Duration::from_nanos((-distance_ns) as u64))
253 }
254 .context(TimeSnafu)?;
255 let upper_bound = lower_bound.add_duration(window_size).context(TimeSnafu)?;
256
257 return Ok((Some(lower_bound), Some(upper_bound)));
258 }
259
260 let lower_bound =
261 calc_expr_time_window_lower_bound(&self.phy_expr, &self.df_schema, current)?;
262 let upper_bound =
263 probe_expr_time_window_upper_bound(&self.phy_expr, &self.df_schema, current)?;
264 Ok((lower_bound, upper_bound))
265 }
266
267 pub async fn handle_rows(
271 &self,
272 rows_list: Vec<api::v1::Rows>,
273 ) -> Result<BTreeSet<Timestamp>, Error> {
274 let mut time_windows = BTreeSet::new();
275
276 for rows in rows_list {
277 let ts_col_index = rows
280 .schema
281 .iter()
282 .map(|col| col.column_name.clone())
283 .position(|name| name == self.column_name);
284 let Some(ts_col_index) = ts_col_index else {
285 warn!("can't found time index column in schema: {:?}", rows.schema);
286 continue;
287 };
288 let col_schema = &rows.schema[ts_col_index];
289 let cdt = from_proto_to_data_type(col_schema)?;
290
291 let mut vector = cdt.create_mutable_vector(rows.rows.len());
292 for row in rows.rows {
293 let value = pb_value_to_value_ref(&row.values[ts_col_index], None);
294 vector.try_push_value_ref(&value).context(DataTypeSnafu {
295 msg: "Failed to convert rows to columns",
296 })?;
297 }
298 let vector = vector.to_vector();
299
300 let df_schema = create_df_schema_for_ts_column(&self.column_name, cdt)?;
301
302 let rb =
303 DfRecordBatch::try_new(df_schema.inner().clone(), vec![vector.to_arrow_array()])
304 .with_context(|_e| ArrowSnafu {
305 context: format!(
306 "Failed to create record batch from {df_schema:?} and {vector:?}"
307 ),
308 })?;
309
310 let eval_res = self
311 .phy_expr
312 .evaluate(&rb)
313 .with_context(|_| DatafusionSnafu {
314 context: format!(
315 "Failed to evaluate physical expression {:?} on {rb:?}",
316 self.phy_expr
317 ),
318 })?;
319
320 let res = columnar_to_ts_vector(&eval_res)?;
321
322 for ts in res.into_iter().flatten() {
323 time_windows.insert(ts);
324 }
325 }
326
327 Ok(time_windows)
328 }
329}
330
331fn create_df_schema_for_ts_column(name: &str, cdt: ConcreteDataType) -> Result<DFSchema, Error> {
332 let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
333 name,
334 cdt.as_arrow_type(),
335 false,
336 )]));
337
338 let df_schema = DFSchema::from_field_specific_qualified_schema(
339 vec![Some(TableReference::bare("TimeIndexOnlyTable"))],
340 &arrow_schema,
341 )
342 .with_context(|_e| DatafusionSnafu {
343 context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
344 })?;
345
346 Ok(df_schema)
347}
348
349fn columnar_to_ts_vector(columnar: &ColumnarValue) -> Result<Vec<Option<Timestamp>>, Error> {
351 let val = match columnar {
352 datafusion_expr::ColumnarValue::Array(array) => {
353 let ty = array.data_type();
354 let ty = ConcreteDataType::from_arrow_type(ty);
355 let time_unit = if let ConcreteDataType::Timestamp(ty) = ty {
356 ty.unit()
357 } else {
358 return UnexpectedSnafu {
359 reason: format!("Non-timestamp type: {ty:?}"),
360 }
361 .fail();
362 };
363
364 match time_unit {
365 TimeUnit::Second => array
366 .as_ref()
367 .as_any()
368 .downcast_ref::<TimestampSecondArray>()
369 .with_context(|| PlanSnafu {
370 reason: format!("Failed to create vector from arrow array {array:?}"),
371 })?
372 .values()
373 .iter()
374 .map(|d| Some(Timestamp::new(*d, time_unit)))
375 .collect_vec(),
376 TimeUnit::Millisecond => array
377 .as_ref()
378 .as_any()
379 .downcast_ref::<TimestampMillisecondArray>()
380 .with_context(|| PlanSnafu {
381 reason: format!("Failed to create vector from arrow array {array:?}"),
382 })?
383 .values()
384 .iter()
385 .map(|d| Some(Timestamp::new(*d, time_unit)))
386 .collect_vec(),
387 TimeUnit::Microsecond => array
388 .as_ref()
389 .as_any()
390 .downcast_ref::<TimestampMicrosecondArray>()
391 .with_context(|| PlanSnafu {
392 reason: format!("Failed to create vector from arrow array {array:?}"),
393 })?
394 .values()
395 .iter()
396 .map(|d| Some(Timestamp::new(*d, time_unit)))
397 .collect_vec(),
398 TimeUnit::Nanosecond => array
399 .as_ref()
400 .as_any()
401 .downcast_ref::<TimestampNanosecondArray>()
402 .with_context(|| PlanSnafu {
403 reason: format!("Failed to create vector from arrow array {array:?}"),
404 })?
405 .values()
406 .iter()
407 .map(|d| Some(Timestamp::new(*d, time_unit)))
408 .collect_vec(),
409 }
410 }
411 datafusion_expr::ColumnarValue::Scalar(scalar) => {
412 let value = Value::try_from(scalar.clone()).with_context(|_| DatatypesSnafu {
413 extra: format!("Failed to convert scalar {scalar:?} to value"),
414 })?;
415 let ts = value.as_timestamp().context(UnexpectedSnafu {
416 reason: format!("Expect Timestamp, found {:?}", value),
417 })?;
418 vec![Some(ts)]
419 }
420 };
421 Ok(val)
422}
423
424pub async fn find_time_window_expr(
431 plan: &LogicalPlan,
432 catalog_man: CatalogManagerRef,
433 query_ctx: QueryContextRef,
434) -> Result<(String, Option<datafusion_expr::Expr>, TimeUnit, DFSchema), Error> {
435 if should_skip_time_window_expr(plan) {
442 return Ok((
445 String::new(),
446 None,
447 TimeUnit::Millisecond,
448 DFSchema::empty(),
449 ));
450 }
451
452 let mut table_name = None;
453
454 plan.apply(|plan| {
456 let LogicalPlan::TableScan(table_scan) = plan else {
457 return Ok(TreeNodeRecursion::Continue);
458 };
459 table_name = Some(table_scan.table_name.clone());
460 Ok(TreeNodeRecursion::Stop)
461 })
462 .with_context(|_| DatafusionSnafu {
463 context: format!("Can't find table source in plan {plan:?}"),
464 })?;
465 let Some(table_name) = table_name else {
466 UnexpectedSnafu {
467 reason: format!("Can't find table source in plan {plan:?}"),
468 }
469 .fail()?
470 };
471
472 let current_schema = query_ctx.current_schema();
473
474 let catalog_name = table_name.catalog().unwrap_or(query_ctx.current_catalog());
475 let schema_name = table_name.schema().unwrap_or(¤t_schema);
476 let table_name = table_name.table();
477
478 let Some(table_ref) = catalog_man
479 .table(catalog_name, schema_name, table_name, Some(&query_ctx))
480 .await
481 .map_err(BoxedError::new)
482 .context(ExternalSnafu)?
483 else {
484 UnexpectedSnafu {
485 reason: format!(
486 "Can't find table {table_name:?} in catalog {catalog_name:?}/{schema_name:?}"
487 ),
488 }
489 .fail()?
490 };
491
492 let schema = &table_ref.table_info().meta.schema;
493
494 let ts_index = schema.timestamp_column().with_context(|| UnexpectedSnafu {
495 reason: format!("Can't find timestamp column in table {table_name:?}"),
496 })?;
497
498 let ts_col_name = ts_index.name.clone();
499
500 let expected_time_unit = ts_index.data_type.as_timestamp().with_context(|| UnexpectedSnafu {
501 reason: format!(
502 "Expected timestamp column {ts_col_name:?} in table {table_name:?} to be timestamp, but got {ts_index:?}"
503 ),
504 })?.unit();
505
506 let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
507 ts_col_name.clone(),
508 ts_index.data_type.as_arrow_type(),
509 false,
510 )]));
511
512 let df_schema = DFSchema::from_field_specific_qualified_schema(
513 vec![Some(TableReference::bare(table_name))],
514 &arrow_schema,
515 )
516 .with_context(|_e| DatafusionSnafu {
517 context: format!("Failed to create DFSchema from arrow schema {arrow_schema:?}"),
518 })?;
519
520 let mut aggr_expr = None;
522 let mut time_window_expr: Option<Expr> = None;
523
524 let find_inner_aggr_expr = |plan: &LogicalPlan| {
525 if let LogicalPlan::Aggregate(aggregate) = plan {
526 aggr_expr = Some(aggregate.clone());
527 };
528
529 Ok(TreeNodeRecursion::Continue)
530 };
531 plan.apply(find_inner_aggr_expr)
532 .with_context(|_| DatafusionSnafu {
533 context: format!("Can't find aggr expr in plan {plan:?}"),
534 })?;
535
536 if let Some(aggregate) = aggr_expr {
537 for group_expr in &aggregate.group_expr {
538 let refs = group_expr.column_refs();
539 if refs.len() != 1 {
540 continue;
541 }
542 let ref_col = refs.iter().next().unwrap();
543
544 let index = aggregate.input.schema().maybe_index_of_column(ref_col);
545 let Some(index) = index else {
546 continue;
547 };
548 let field = aggregate.input.schema().field(index);
549
550 let is_time_index =
552 field.metadata().get(TIME_INDEX_KEY).map(|s| s.as_str()) == Some("true");
553
554 if is_time_index {
555 let rewrite_column = group_expr.clone();
556 let rewritten = rewrite_column
557 .rewrite(&mut RewriteColumn {
558 table_name: table_name.to_string(),
559 })
560 .with_context(|_| DatafusionSnafu {
561 context: format!("Rewrite expr failed, expr={:?}", group_expr),
562 })?
563 .data;
564 struct RewriteColumn {
565 table_name: String,
566 }
567
568 impl TreeNodeRewriter for RewriteColumn {
569 type Node = Expr;
570 fn f_down(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
571 let Expr::Column(mut column) = node else {
572 return Ok(Transformed::no(node));
573 };
574
575 column.relation = Some(TableReference::bare(self.table_name.clone()));
576
577 Ok(Transformed::yes(Expr::Column(column)))
578 }
579 }
580
581 time_window_expr = Some(rewritten);
582 break;
583 }
584 }
585 Ok((ts_col_name, time_window_expr, expected_time_unit, df_schema))
586 } else {
587 Ok((ts_col_name, None, expected_time_unit, df_schema))
589 }
590}
591
592#[cfg(test)]
604pub async fn find_plan_time_window_bound(
605 plan: &LogicalPlan,
606 current: Timestamp,
607 query_ctx: QueryContextRef,
608 engine: query::QueryEngineRef,
609) -> Result<(String, Option<Timestamp>, Option<Timestamp>), Error> {
610 let catalog_man = engine.engine_state().catalog_manager();
612
613 let (ts_col_name, time_window_expr, expected_time_unit, df_schema) =
614 find_time_window_expr(plan, catalog_man.clone(), query_ctx).await?;
615 let new_current = current
617 .convert_to(expected_time_unit)
618 .with_context(|| UnexpectedSnafu {
619 reason: format!("Failed to cast current timestamp {current:?} to {expected_time_unit}"),
620 })?;
621
622 if let Some(time_window_expr) = time_window_expr {
624 let phy_expr = to_phy_expr(
625 &time_window_expr,
626 &df_schema,
627 &engine.engine_state().session_state(),
628 )?;
629 let lower_bound = calc_expr_time_window_lower_bound(&phy_expr, &df_schema, new_current)?;
630 let upper_bound = probe_expr_time_window_upper_bound(&phy_expr, &df_schema, new_current)?;
631 Ok((ts_col_name, lower_bound, upper_bound))
632 } else {
633 Ok((ts_col_name, None, None))
634 }
635}
636
637fn calc_expr_time_window_lower_bound(
646 phy_expr: &PhysicalExprRef,
647 df_schema: &DFSchema,
648 current: Timestamp,
649) -> Result<Option<Timestamp>, Error> {
650 let cur_time_window = eval_phy_time_window_expr(phy_expr, df_schema, current)?;
651 let input_time_unit = cur_time_window.unit();
652 Ok(cur_time_window.convert_to(input_time_unit))
653}
654
655fn probe_expr_time_window_upper_bound(
657 phy_expr: &PhysicalExprRef,
658 df_schema: &DFSchema,
659 current: Timestamp,
660) -> Result<Option<Timestamp>, Error> {
661 use std::cmp::Ordering;
663
664 let cur_time_window = eval_phy_time_window_expr(phy_expr, df_schema, current)?;
665
666 let mut offset: i64 = 1;
668 let mut lower_bound = Some(current);
669 let upper_bound;
670 loop {
672 let Some(next_val) = current.value().checked_add(offset) else {
673 return Ok(None);
675 };
676
677 let next_time_probe = common_time::Timestamp::new(next_val, current.unit());
678
679 let next_time_window = eval_phy_time_window_expr(phy_expr, df_schema, next_time_probe)?;
680
681 match next_time_window.cmp(&cur_time_window) {
682 Ordering::Less => UnexpectedSnafu {
683 reason: format!(
684 "Unsupported time window expression, expect monotonic increasing for time window expression {phy_expr:?}"
685 ),
686 }
687 .fail()?,
688 Ordering::Equal => {
689 lower_bound = Some(next_time_probe);
690 }
691 Ordering::Greater => {
692 upper_bound = Some(next_time_probe);
693 break
694 }
695 }
696
697 let Some(new_offset) = offset.checked_mul(2) else {
698 return Ok(None);
700 };
701 offset = new_offset;
702 }
703
704 binary_search_expr(
707 lower_bound,
708 upper_bound,
709 cur_time_window,
710 phy_expr,
711 df_schema,
712 )
713 .map(Some)
714}
715
716fn binary_search_expr(
717 lower_bound: Option<Timestamp>,
718 upper_bound: Option<Timestamp>,
719 cur_time_window: Timestamp,
720 phy_expr: &PhysicalExprRef,
721 df_schema: &DFSchema,
722) -> Result<Timestamp, Error> {
723 ensure!(
724 lower_bound.map(|v| v.unit()) == upper_bound.map(|v| v.unit()),
725 UnexpectedSnafu {
726 reason: format!(
727 " unit mismatch for time window expression {phy_expr:?}, found {lower_bound:?} and {upper_bound:?}"
728 ),
729 }
730 );
731
732 let output_unit = upper_bound
733 .context(UnexpectedSnafu {
734 reason: "should have lower bound",
735 })?
736 .unit();
737
738 let mut low = lower_bound
739 .context(UnexpectedSnafu {
740 reason: "should have lower bound",
741 })?
742 .value();
743 let mut high = upper_bound
744 .context(UnexpectedSnafu {
745 reason: "should have upper bound",
746 })?
747 .value();
748 while low < high {
749 let mid = (low + high) / 2;
750 let mid_probe = common_time::Timestamp::new(mid, output_unit);
751 let mid_time_window = eval_phy_time_window_expr(phy_expr, df_schema, mid_probe)?;
752
753 match mid_time_window.cmp(&cur_time_window) {
754 std::cmp::Ordering::Less => UnexpectedSnafu {
755 reason: format!("Binary search failed for time window expression {phy_expr:?}"),
756 }
757 .fail()?,
758 std::cmp::Ordering::Equal => low = mid + 1,
759 std::cmp::Ordering::Greater => high = mid,
760 }
761 }
762
763 let final_upper_bound_for_time_window = common_time::Timestamp::new(high, output_unit);
764 Ok(final_upper_bound_for_time_window)
765}
766
767fn eval_phy_time_window_expr(
769 phy: &PhysicalExprRef,
770 df_schema: &DFSchema,
771 input_value: Timestamp,
772) -> Result<Timestamp, Error> {
773 let schema_ty = df_schema.field(0).data_type();
774 let schema_cdt = ConcreteDataType::from_arrow_type(schema_ty);
775 let schema_unit = if let ConcreteDataType::Timestamp(ts) = schema_cdt {
776 ts.unit()
777 } else {
778 return UnexpectedSnafu {
779 reason: format!("Expect Timestamp, found {:?}", schema_cdt),
780 }
781 .fail();
782 };
783 let input_value = input_value
784 .convert_to(schema_unit)
785 .with_context(|| UnexpectedSnafu {
786 reason: format!("Failed to convert timestamp {input_value:?} to {schema_unit}"),
787 })?;
788 let ts_vector = match schema_unit {
789 TimeUnit::Second => {
790 TimestampSecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
791 }
792 TimeUnit::Millisecond => {
793 TimestampMillisecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
794 }
795 TimeUnit::Microsecond => {
796 TimestampMicrosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
797 }
798 TimeUnit::Nanosecond => {
799 TimestampNanosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
800 }
801 };
802
803 let rb = DfRecordBatch::try_new(df_schema.inner().clone(), vec![ts_vector.clone()])
804 .with_context(|_| ArrowSnafu {
805 context: format!("Failed to create record batch from {df_schema:?} and {ts_vector:?}"),
806 })?;
807
808 let eval_res = phy.evaluate(&rb).with_context(|_| DatafusionSnafu {
809 context: format!("Failed to evaluate physical expression {phy:?} on {rb:?}"),
810 })?;
811
812 if let Some(Some(ts)) = columnar_to_ts_vector(&eval_res)?.first() {
813 Ok(*ts)
814 } else {
815 UnexpectedSnafu {
816 reason: format!(
817 "Expected timestamp in expression {phy:?} but got {:?}",
818 eval_res
819 ),
820 }
821 .fail()?
822 }
823}
824
825fn to_phy_expr(
826 expr: &Expr,
827 df_schema: &DFSchema,
828 session: &SessionState,
829) -> Result<PhysicalExprRef, Error> {
830 let phy_planner = DefaultPhysicalPlanner::default();
831
832 let phy_expr: PhysicalExprRef = phy_planner
833 .create_physical_expr(expr, df_schema, session)
834 .with_context(|_e| DatafusionSnafu {
835 context: format!(
836 "Failed to create physical expression from {expr:?} using {df_schema:?}"
837 ),
838 })?;
839 Ok(phy_expr)
840}
841
842#[cfg(test)]
843mod test {
844 use datafusion_common::tree_node::TreeNode;
845 use pretty_assertions::assert_eq;
846 use session::context::QueryContext;
847
848 use super::*;
849 use crate::batching_mode::utils::{AddFilterRewriter, df_plan_to_sql, sql_to_df_plan};
850 use crate::test_utils::create_test_query_engine;
851
852 #[tokio::test]
853 async fn test_plan_time_window_lower_bound() {
854 use datafusion_expr::{col, lit};
855 let query_engine = create_test_query_engine();
856 let ctx = QueryContext::arc();
857
858 let testcases = [
859 (
861 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts GROUP BY ts;",
862 Timestamp::new(1740394109, TimeUnit::Second),
863 (
864 "ts".to_string(),
865 Some(Timestamp::new(1740394109000, TimeUnit::Millisecond)),
866 Some(Timestamp::new(1740394109001, TimeUnit::Millisecond)),
867 ),
868 r#"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"#,
869 ),
870 (
872 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts GROUP BY time_window;",
873 Timestamp::new(1740394109, TimeUnit::Second),
874 (
875 "ts".to_string(),
876 Some(Timestamp::new(1740394080, TimeUnit::Second)),
877 Some(Timestamp::new(1740394140, TimeUnit::Second)),
878 ),
879 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')",
880 ),
881 (
883 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE number in (2, 3, 4) GROUP BY time_window;",
884 Timestamp::new(1740394109, TimeUnit::Second),
885 (
886 "ts".to_string(),
887 Some(Timestamp::new(1740394080, TimeUnit::Second)),
888 Some(Timestamp::new(1740394140, TimeUnit::Second)),
889 ),
890 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE numbers_with_ts.number IN (2, 3, 4) AND ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')",
891 ),
892 (
894 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE number BETWEEN 2 AND 4 GROUP BY time_window;",
895 Timestamp::new(1740394109, TimeUnit::Second),
896 (
897 "ts".to_string(),
898 Some(Timestamp::new(1740394080, TimeUnit::Second)),
899 Some(Timestamp::new(1740394140, TimeUnit::Second)),
900 ),
901 "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE (numbers_with_ts.number BETWEEN 2 AND 4) AND ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')",
902 ),
903 (
905 "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;",
906 Timestamp::new(23, TimeUnit::Millisecond),
907 ("ts".to_string(), None, None),
908 "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;",
909 ),
910 (
912 "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
913 Timestamp::new(23, TimeUnit::Nanosecond),
914 (
915 "ts".to_string(),
916 Some(Timestamp::new(0, TimeUnit::Millisecond)),
917 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
918 ),
919 "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)",
920 ),
921 (
923 "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
924 Timestamp::new(0, TimeUnit::Nanosecond),
925 (
926 "ts".to_string(),
927 Some(Timestamp::new(0, TimeUnit::Millisecond)),
928 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
929 ),
930 "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)",
931 ),
932 (
934 "SELECT date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
935 Timestamp::new(23_000_000, TimeUnit::Nanosecond),
936 (
937 "ts".to_string(),
938 Some(Timestamp::new(0, TimeUnit::Millisecond)),
939 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
940 ),
941 "SELECT date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)",
942 ),
943 (
945 "SELECT sum(number) as sum_up, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window;",
946 Timestamp::new(23, TimeUnit::Millisecond),
947 (
948 "ts".to_string(),
949 Some(Timestamp::new(0, TimeUnit::Millisecond)),
950 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
951 ),
952 "SELECT sum(numbers_with_ts.number) AS sum_up, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts)",
953 ),
954 (
956 "SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number;",
957 Timestamp::new(23, TimeUnit::Millisecond),
958 (
959 "ts".to_string(),
960 Some(Timestamp::new(0, TimeUnit::Millisecond)),
961 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
962 ),
963 "SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number",
964 ),
965 (
967 "SELECT number, time_window FROM (SELECT number, date_bin('5 minutes', ts) as time_window FROM numbers_with_ts GROUP BY time_window, number);",
968 Timestamp::new(23, TimeUnit::Millisecond),
969 (
970 "ts".to_string(),
971 Some(Timestamp::new(0, TimeUnit::Millisecond)),
972 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
973 ),
974 "SELECT numbers_with_ts.number, time_window FROM (SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number)",
975 ),
976 (
978 "with cte as (select number, date_bin('5 minutes', ts) as time_window from numbers_with_ts GROUP BY time_window, number) select number, time_window from cte;",
979 Timestamp::new(23, TimeUnit::Millisecond),
980 (
981 "ts".to_string(),
982 Some(Timestamp::new(0, TimeUnit::Millisecond)),
983 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
984 ),
985 "SELECT cte.number, cte.time_window FROM (SELECT numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP))) GROUP BY date_bin('5 minutes', numbers_with_ts.ts), numbers_with_ts.number) AS cte",
986 ),
987 (
989 "SELECT sum(number), number, date_bin('5 minutes', ts) as time_window, bucket_name FROM (SELECT number, ts, case when number < 5 THEN 'bucket_0_5' when number >= 5 THEN 'bucket_5_inf' END as bucket_name FROM numbers_with_ts) GROUP BY number, time_window, bucket_name;",
990 Timestamp::new(23, TimeUnit::Millisecond),
991 (
992 "ts".to_string(),
993 Some(Timestamp::new(0, TimeUnit::Millisecond)),
994 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
995 ),
996 "SELECT sum(numbers_with_ts.number), numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts) AS time_window, bucket_name FROM (SELECT numbers_with_ts.number, numbers_with_ts.ts, CASE WHEN (numbers_with_ts.number < 5) THEN 'bucket_0_5' WHEN (numbers_with_ts.number >= 5) THEN 'bucket_5_inf' END AS bucket_name FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP)))) GROUP BY numbers_with_ts.number, date_bin('5 minutes', numbers_with_ts.ts), bucket_name",
997 ),
998 (
1000 "SELECT sum(number), number, date_bin('5 minutes', ts) as time_window, bucket_name FROM (SELECT number, ts, case when number < 5 THEN 'bucket_0_5' when number >= 5 THEN 'bucket_5_inf' END as bucket_name FROM numbers_with_ts) as cte GROUP BY number, time_window, bucket_name;",
1001 Timestamp::new(23, TimeUnit::Millisecond),
1002 (
1003 "ts".to_string(),
1004 Some(Timestamp::new(0, TimeUnit::Millisecond)),
1005 Some(Timestamp::new(300000, TimeUnit::Millisecond)),
1006 ),
1007 "SELECT sum(cte.number), cte.number, date_bin('5 minutes', cte.ts) AS time_window, cte.bucket_name FROM (SELECT numbers_with_ts.number, numbers_with_ts.ts, CASE WHEN (numbers_with_ts.number < 5) THEN 'bucket_0_5' WHEN (numbers_with_ts.number >= 5) THEN 'bucket_5_inf' END AS bucket_name FROM numbers_with_ts WHERE ((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts <= CAST('1970-01-01 00:05:00' AS TIMESTAMP)))) AS cte GROUP BY cte.number, date_bin('5 minutes', cte.ts), cte.bucket_name",
1008 ),
1009 ];
1010
1011 for (sql, current, expected, expected_unparsed) in testcases {
1012 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, true)
1013 .await
1014 .unwrap();
1015
1016 let real =
1017 find_plan_time_window_bound(&plan, current, ctx.clone(), query_engine.clone())
1018 .await
1019 .unwrap();
1020 assert_eq!(expected, real);
1021
1022 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false)
1023 .await
1024 .unwrap();
1025 let (col_name, lower, upper) = real;
1026 let new_sql = if let Some(lower) = lower {
1027 let to_df_literal = |value| {
1028 let value = Value::from(value);
1029
1030 value.try_to_scalar_value(&value.data_type()).unwrap()
1031 };
1032 let lower = to_df_literal(lower);
1033 let upper = to_df_literal(upper.unwrap());
1034 let expr = col(&col_name)
1035 .gt_eq(lit(lower))
1036 .and(col(&col_name).lt_eq(lit(upper)));
1037 let mut add_filter = AddFilterRewriter::new(expr);
1038 let plan = plan.rewrite(&mut add_filter).unwrap().data;
1039 df_plan_to_sql(&plan).unwrap()
1040 } else {
1041 sql.to_string()
1042 };
1043 assert_eq!(expected_unparsed, new_sql);
1044 }
1045 }
1046
1047 #[tokio::test]
1048 async fn test_complex_plans_skip_time_window_expr() {
1049 let query_engine = create_test_query_engine();
1050 let ctx = QueryContext::arc();
1051
1052 let testcases = [
1053 r#"
1057SELECT
1058 l.number,
1059 date_bin('5 minutes', l.ts) AS time_window
1060FROM numbers_with_ts l
1061JOIN numbers_with_ts r ON l.number = r.number
1062GROUP BY l.number, time_window
1063"#,
1064 r#"
1067SELECT number, time_window
1068FROM (
1069 SELECT
1070 number,
1071 time_window,
1072 row_number() OVER (PARTITION BY number ORDER BY time_window DESC) AS rn
1073 FROM (
1074 SELECT number, date_bin('5 minutes', ts) AS time_window
1075 FROM numbers_with_ts
1076 GROUP BY number, time_window
1077 )
1078)
1079WHERE rn = 1
1080"#,
1081 r#"
1083SELECT date_bin('5 minutes', ts) AS time_window
1084FROM numbers_with_ts
1085GROUP BY time_window
1086UNION ALL
1087SELECT date_bin('5 minutes', ts) AS time_window
1088FROM numbers_with_ts
1089GROUP BY time_window
1090"#,
1091 r#"
1094SELECT max(cnt)
1095FROM (
1096 SELECT date_bin('5 minutes', ts) AS time_window, count(number) AS cnt
1097 FROM numbers_with_ts
1098 GROUP BY time_window
1099)
1100"#,
1101 r#"
1104SELECT date_bin('5 minutes', ts) AS time_window
1105FROM numbers_with_ts
1106WHERE number IN (SELECT number FROM numbers_with_ts)
1107GROUP BY time_window
1108"#,
1109 r#"
1113SELECT date_bin('5 minutes', ts) AS time_window
1114FROM numbers_with_ts
1115GROUP BY time_window
1116ORDER BY time_window
1117"#,
1118 r#"
1121SELECT DISTINCT time_window
1122FROM (
1123 SELECT date_bin('5 minutes', ts) AS time_window
1124 FROM numbers_with_ts
1125 GROUP BY time_window
1126)
1127"#,
1128 r#"
1130SELECT date_bin('5 minutes', ts) AS time_window
1131FROM numbers_with_ts
1132GROUP BY time_window
1133LIMIT 10
1134"#,
1135 r#"
1139SELECT date_bin('5 minutes', l.ts) AS time_window
1140FROM numbers_with_ts l, numbers_with_ts r
1141GROUP BY time_window
1142"#,
1143 r#"
1147SELECT date_bin('5 minutes', l.ts) AS time_window
1148FROM numbers_with_ts l CROSS JOIN (VALUES (1)) AS v(x)
1149GROUP BY time_window
1150"#,
1151 ];
1152
1153 for sql in testcases {
1154 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, true)
1155 .await
1156 .unwrap();
1157 let (_, lower, upper) = find_plan_time_window_bound(
1158 &plan,
1159 Timestamp::new(23, TimeUnit::Millisecond),
1160 ctx.clone(),
1161 query_engine.clone(),
1162 )
1163 .await
1164 .unwrap();
1165
1166 assert_eq!(None, lower, "query should not have TWE: {sql}");
1167 assert_eq!(None, upper, "query should not have TWE: {sql}");
1168 }
1169 }
1170
1171 #[tokio::test]
1172 async fn test_simple_single_source_aggregate_keeps_time_window_expr() {
1173 let query_engine = create_test_query_engine();
1174 let ctx = QueryContext::arc();
1175
1176 let sql = r#"
1177SELECT max(number) AS max_number, date_bin('5 minutes', ts) AS time_window
1178FROM numbers_with_ts
1179GROUP BY time_window
1180"#;
1181 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, true)
1182 .await
1183 .unwrap();
1184 let (_, lower, upper) = find_plan_time_window_bound(
1185 &plan,
1186 Timestamp::new(23, TimeUnit::Millisecond),
1187 ctx.clone(),
1188 query_engine.clone(),
1189 )
1190 .await
1191 .unwrap();
1192
1193 assert_eq!(Some(Timestamp::new(0, TimeUnit::Millisecond)), lower);
1194 assert_eq!(Some(Timestamp::new(300000, TimeUnit::Millisecond)), upper);
1195 }
1196}