1use std::collections::BTreeMap;
18use std::time::Duration;
19
20use common_telemetry::debug;
21use common_telemetry::tracing::warn;
22use common_time::Timestamp;
23use datatypes::value::Value;
24use session::context::QueryContextRef;
25use snafu::{OptionExt, ResultExt, ensure};
26use tokio::sync::oneshot;
27use tokio::time::Instant;
28
29use crate::batching_mode::task::BatchingTask;
30use crate::batching_mode::time_window::TimeWindowExpr;
31use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
32use crate::metrics::{
33 METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE,
34 METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE,
35};
36use crate::{Error, FlowId};
37
38#[derive(Debug)]
40pub struct TaskState {
41 pub(crate) query_ctx: QueryContextRef,
43 last_update_time: Instant,
45 last_query_duration: Duration,
47 last_exec_time_millis: Option<i64>,
49 pub(crate) dirty_time_windows: DirtyTimeWindows,
52 exec_state: ExecState,
53 pub(crate) shutdown_rx: oneshot::Receiver<()>,
55 pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
57}
58impl TaskState {
59 pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
60 Self {
61 query_ctx,
62 last_update_time: Instant::now(),
63 last_query_duration: Duration::from_secs(0),
64 last_exec_time_millis: None,
65 dirty_time_windows: Default::default(),
66 exec_state: ExecState::Idle,
67 shutdown_rx,
68 task_handle: None,
69 }
70 }
71
72 pub fn after_query_exec(&mut self, elapsed: Duration, is_succ: bool) {
75 self.exec_state = ExecState::Idle;
76 self.last_query_duration = elapsed;
77 self.last_update_time = Instant::now();
78 if is_succ {
79 self.last_exec_time_millis = Some(common_time::util::current_time_millis());
80 }
81 }
82
83 pub fn last_execution_time_millis(&self) -> Option<i64> {
84 self.last_exec_time_millis
85 }
86
87 pub fn get_next_start_query_time(
98 &self,
99 flow_id: FlowId,
100 time_window_size: &Option<Duration>,
101 min_refresh_duration: Duration,
102 max_timeout: Option<Duration>,
103 max_filter_num_per_query: usize,
104 ) -> Instant {
105 let lower = time_window_size.unwrap_or(min_refresh_duration);
107 let next_duration = self.last_query_duration.max(lower);
108 let next_duration = if let Some(max_timeout) = max_timeout {
109 next_duration.min(max_timeout)
110 } else {
111 next_duration
112 };
113
114 let cur_dirty_window_size = self.dirty_time_windows.window_size();
115 let max_query_update_range = (*time_window_size)
117 .unwrap_or_default()
118 .mul_f64(max_filter_num_per_query as f64);
119 if cur_dirty_window_size < max_query_update_range {
122 self.last_update_time + next_duration
123 } else {
124 debug!(
127 "Flow id = {}, still have too many {} dirty time window({:?}), execute immediately",
128 flow_id,
129 self.dirty_time_windows.windows.len(),
130 self.dirty_time_windows.windows
131 );
132 Instant::now()
133 }
134 }
135}
136
137#[derive(Debug, Clone)]
140pub struct DirtyTimeWindows {
141 windows: BTreeMap<Timestamp, Option<Timestamp>>,
144 max_filter_num_per_query: usize,
146 time_window_merge_threshold: usize,
149}
150
151impl DirtyTimeWindows {
152 pub fn new(max_filter_num_per_query: usize, time_window_merge_threshold: usize) -> Self {
153 Self {
154 windows: BTreeMap::new(),
155 max_filter_num_per_query,
156 time_window_merge_threshold,
157 }
158 }
159}
160
161impl Default for DirtyTimeWindows {
162 fn default() -> Self {
163 Self {
164 windows: BTreeMap::new(),
165 max_filter_num_per_query: 20,
166 time_window_merge_threshold: 3,
167 }
168 }
169}
170
171impl DirtyTimeWindows {
172 pub const MERGE_DIST: i32 = 3;
176
177 pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
183 for lower_bound in lower_bounds {
184 let entry = self.windows.entry(lower_bound);
185 entry.or_insert(None);
186 }
187 }
188
189 pub fn window_size(&self) -> Duration {
190 let mut ret = Duration::from_secs(0);
191 for (start, end) in &self.windows {
192 if let Some(end) = end
193 && let Some(duration) = end.sub(start)
194 {
195 ret += duration.to_std().unwrap_or_default();
196 }
197 }
198 ret
199 }
200
201 pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
202 self.windows.insert(start, end);
203 }
204
205 pub fn add_windows(&mut self, time_ranges: Vec<(Timestamp, Timestamp)>) {
206 for (start, end) in time_ranges {
207 self.windows.insert(start, Some(end));
208 }
209 }
210
211 pub fn clean(&mut self) {
213 self.windows.clear();
214 }
215
216 pub fn set_dirty(&mut self) {
219 self.windows.insert(Timestamp::new_second(0), None);
220 }
221
222 pub fn len(&self) -> usize {
224 self.windows.len()
225 }
226
227 pub fn is_empty(&self) -> bool {
228 self.windows.is_empty()
229 }
230
231 pub fn effective_count(&self, window_size: &Duration) -> usize {
234 if self.windows.is_empty() {
235 return 0;
236 }
237 let window_size =
238 chrono::Duration::from_std(*window_size).unwrap_or(chrono::Duration::zero());
239 let total_window_time_range =
240 self.windows
241 .iter()
242 .fold(chrono::Duration::zero(), |acc, (start, end)| {
243 if let Some(end) = end {
244 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
245 } else {
246 acc + window_size
247 }
248 });
249
250 if window_size.num_seconds() == 0 {
252 0
253 } else {
254 (total_window_time_range.num_seconds() / window_size.num_seconds()) as usize
255 }
256 }
257
258 pub fn gen_filter_exprs(
264 &mut self,
265 col_name: &str,
266 expire_lower_bound: Option<Timestamp>,
267 window_size: chrono::Duration,
268 window_cnt: usize,
269 flow_id: FlowId,
270 task_ctx: Option<&BatchingTask>,
271 ) -> Result<Option<FilterExprInfo>, Error> {
272 ensure!(
273 window_size.num_seconds() > 0,
274 UnexpectedSnafu {
275 reason: "window_size is zero, can't generate filter exprs",
276 }
277 );
278
279 debug!(
280 "expire_lower_bound: {:?}, window_size: {:?}",
281 expire_lower_bound.map(|t| t.to_iso8601_string()),
282 window_size
283 );
284 self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
285
286 if self.windows.len() > self.max_filter_num_per_query {
287 let first_time_window = self.windows.first_key_value();
288 let last_time_window = self.windows.last_key_value();
289
290 if let Some(task_ctx) = task_ctx {
291 warn!(
292 "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
293 task_ctx.config.flow_id,
294 self.windows.len(),
295 self.max_filter_num_per_query,
296 task_ctx.config.time_window_expr,
297 task_ctx.config.expire_after,
298 first_time_window,
299 last_time_window,
300 task_ctx.config.query
301 );
302 } else {
303 warn!(
304 "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}",
305 flow_id,
306 self.windows.len(),
307 self.max_filter_num_per_query,
308 first_time_window,
309 last_time_window
310 )
311 }
312 }
313
314 let max_time_range = window_size * window_cnt as i32;
316
317 let mut to_be_query = BTreeMap::new();
318 let mut new_windows = self.windows.clone();
319 let mut cur_time_range = chrono::Duration::zero();
320 for (idx, (start, end)) in self.windows.iter().enumerate() {
321 let first_end = start
322 .add_duration(window_size.to_std().unwrap())
323 .context(TimeSnafu)?;
324 let end = end.unwrap_or(first_end);
325
326 if cur_time_range >= max_time_range {
328 break;
329 }
330
331 if idx >= window_cnt {
333 break;
334 }
335
336 let Some(x) = end.sub(start) else {
337 continue;
338 };
339 if cur_time_range + x <= max_time_range {
340 to_be_query.insert(*start, Some(end));
341 new_windows.remove(start);
342 cur_time_range += x;
343 } else {
344 let surplus = max_time_range - cur_time_range;
347 if surplus.num_seconds() <= window_size.num_seconds() {
348 break;
350 }
351 let times = surplus.num_seconds() / window_size.num_seconds();
352
353 let split_offset = window_size * times as i32;
354 let split_at = start
355 .add_duration(split_offset.to_std().unwrap())
356 .context(TimeSnafu)?;
357 to_be_query.insert(*start, Some(split_at));
358
359 new_windows.remove(start);
361 new_windows.insert(split_at, Some(end));
362 cur_time_range += split_offset;
363 break;
364 }
365 }
366
367 self.windows = new_windows;
368
369 METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
370 .with_label_values(&[flow_id.to_string().as_str()])
371 .observe(to_be_query.len() as f64);
372
373 let full_time_range = to_be_query
374 .iter()
375 .fold(chrono::Duration::zero(), |acc, (start, end)| {
376 if let Some(end) = end {
377 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
378 } else {
379 acc + window_size
380 }
381 })
382 .num_seconds() as f64;
383 METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE
384 .with_label_values(&[flow_id.to_string().as_str()])
385 .observe(full_time_range);
386
387 let stalled_time_range =
388 self.windows
389 .iter()
390 .fold(chrono::Duration::zero(), |acc, (start, end)| {
391 if let Some(end) = end {
392 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
393 } else {
394 acc + window_size
395 }
396 });
397
398 METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE
399 .with_label_values(&[flow_id.to_string().as_str()])
400 .observe(stalled_time_range.num_seconds() as f64);
401
402 let std_window_size = window_size.to_std().map_err(|e| {
403 InternalSnafu {
404 reason: e.to_string(),
405 }
406 .build()
407 })?;
408
409 let mut expr_lst = vec![];
410 let mut time_ranges = vec![];
411 for (start, end) in to_be_query.into_iter() {
412 let (start, end) = if let Some(ctx) = task_ctx {
414 let Some(time_window_expr) = &ctx.config.time_window_expr else {
415 UnexpectedSnafu {
416 reason: "time_window_expr is not set",
417 }
418 .fail()?
419 };
420 self.align_time_window(start, end, time_window_expr)?
421 } else {
422 (start, end)
423 };
424 let end = end.unwrap_or(start.add_duration(std_window_size).context(TimeSnafu)?);
425 time_ranges.push((start, end));
426
427 debug!(
428 "Time window start: {:?}, end: {:?}",
429 start.to_iso8601_string(),
430 end.to_iso8601_string()
431 );
432
433 use datafusion_expr::{col, lit};
434 let lower = to_df_literal(start)?;
435 let upper = to_df_literal(end)?;
436 let expr = col(col_name)
437 .gt_eq(lit(lower))
438 .and(col(col_name).lt(lit(upper)));
439 expr_lst.push(expr);
440 }
441 let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
442 let ret = expr.map(|expr| FilterExprInfo {
443 expr,
444 col_name: col_name.to_string(),
445 time_ranges,
446 window_size,
447 });
448 Ok(ret)
449 }
450
451 fn align_time_window(
452 &self,
453 start: Timestamp,
454 end: Option<Timestamp>,
455 time_window_expr: &TimeWindowExpr,
456 ) -> Result<(Timestamp, Option<Timestamp>), Error> {
457 let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
458 reason: format!(
459 "Failed to align start time {:?} with time window expr {:?}",
460 start, time_window_expr
461 ),
462 })?;
463 let align_end = end
464 .and_then(|end| {
465 time_window_expr
466 .eval(end)
467 .map(|r| if r.0 == Some(end) { r.0 } else { r.1 })
469 .transpose()
470 })
471 .transpose()?;
472 Ok((align_start, align_end))
473 }
474
475 pub fn merge_dirty_time_windows(
479 &mut self,
480 window_size: chrono::Duration,
481 expire_lower_bound: Option<Timestamp>,
482 ) -> Result<(), Error> {
483 if self.windows.is_empty() {
484 return Ok(());
485 }
486
487 let mut new_windows = BTreeMap::new();
488
489 let std_window_size = window_size.to_std().map_err(|e| {
490 InternalSnafu {
491 reason: e.to_string(),
492 }
493 .build()
494 })?;
495
496 let mut prev_tw = None;
498 for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
499 if let Some(expire_lower_bound) = expire_lower_bound
501 && lower_bound < expire_lower_bound
502 {
503 continue;
504 }
505
506 let Some(prev_tw) = &mut prev_tw else {
507 prev_tw = Some((lower_bound, upper_bound));
508 continue;
509 };
510
511 let prev_upper = prev_tw
514 .1
515 .unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
516 prev_tw.1 = Some(prev_upper);
517
518 let cur_upper = upper_bound.unwrap_or(
519 lower_bound
520 .add_duration(std_window_size)
521 .context(TimeSnafu)?,
522 );
523
524 if lower_bound
525 .sub(&prev_upper)
526 .map(|dist| dist <= window_size * self.time_window_merge_threshold as i32)
527 .unwrap_or(false)
528 {
529 prev_tw.1 = Some(cur_upper);
530 } else {
531 new_windows.insert(prev_tw.0, prev_tw.1);
532 *prev_tw = (lower_bound, Some(cur_upper));
533 }
534 }
535
536 if let Some(prev_tw) = prev_tw {
537 new_windows.insert(prev_tw.0, prev_tw.1);
538 }
539
540 self.windows = new_windows;
541
542 Ok(())
543 }
544}
545
546fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
547 let value = Value::from(value);
548 let value = value
549 .try_to_scalar_value(&value.data_type())
550 .with_context(|_| DatatypesSnafu {
551 extra: format!("Failed to convert to scalar value: {}", value),
552 })?;
553 Ok(value)
554}
555
556#[derive(Debug, Clone)]
557enum ExecState {
558 Idle,
559 Executing,
560}
561
562#[derive(Debug, Clone)]
564pub struct FilterExprInfo {
565 pub expr: datafusion_expr::Expr,
566 pub col_name: String,
567 pub time_ranges: Vec<(Timestamp, Timestamp)>,
568 pub window_size: chrono::Duration,
569}
570
571impl FilterExprInfo {
572 pub fn total_window_length(&self) -> chrono::Duration {
573 self.time_ranges
574 .iter()
575 .fold(chrono::Duration::zero(), |acc, (start, end)| {
576 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
577 })
578 }
579}
580
581#[cfg(test)]
582mod test {
583 use pretty_assertions::assert_eq;
584 use session::context::QueryContext;
585
586 use super::*;
587 use crate::batching_mode::time_window::find_time_window_expr;
588 use crate::batching_mode::utils::sql_to_df_plan;
589 use crate::test_utils::create_test_query_engine;
590
591 #[test]
592 fn test_task_state_records_last_execution_time() {
593 let query_ctx = QueryContext::arc();
594 let (_tx, rx) = tokio::sync::oneshot::channel();
595 let mut state = TaskState::new(query_ctx, rx);
596
597 assert_eq!(None, state.last_execution_time_millis());
598 state.after_query_exec(std::time::Duration::from_millis(1), false);
599 assert_eq!(None, state.last_execution_time_millis());
600
601 state.after_query_exec(std::time::Duration::from_millis(1), true);
602 assert!(state.last_execution_time_millis().is_some());
603 }
604
605 #[test]
606 fn test_merge_dirty_time_windows() {
607 let merge_dist = DirtyTimeWindows::default().time_window_merge_threshold;
608 let testcases = vec![
609 (
611 vec![
612 Timestamp::new_second(0),
613 Timestamp::new_second((1 + merge_dist as i64) * 5 * 60),
614 ],
615 (chrono::Duration::seconds(5 * 60), None),
616 BTreeMap::from([(
617 Timestamp::new_second(0),
618 Some(Timestamp::new_second((2 + merge_dist as i64) * 5 * 60)),
619 )]),
620 Some(
621 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:25:00' AS TIMESTAMP)))",
622 ),
623 ),
624 (
626 vec![
627 Timestamp::new_second(0),
628 Timestamp::new_second((2 + merge_dist as i64) * 5 * 60),
629 ],
630 (chrono::Duration::seconds(5 * 60), None),
631 BTreeMap::from([
632 (
633 Timestamp::new_second(0),
634 Some(Timestamp::new_second(5 * 60)),
635 ),
636 (
637 Timestamp::new_second((2 + merge_dist as i64) * 5 * 60),
638 Some(Timestamp::new_second((3 + merge_dist as i64) * 5 * 60)),
639 ),
640 ]),
641 Some(
642 "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:05:00' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:25:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:30:00' AS TIMESTAMP))))",
643 ),
644 ),
645 (
647 vec![
648 Timestamp::new_second(0),
649 Timestamp::new_second((merge_dist as i64) * 5 * 60),
650 ],
651 (chrono::Duration::seconds(5 * 60), None),
652 BTreeMap::from([(
653 Timestamp::new_second(0),
654 Some(Timestamp::new_second((1 + merge_dist as i64) * 5 * 60)),
655 )]),
656 Some(
657 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:20:00' AS TIMESTAMP)))",
658 ),
659 ),
660 (
662 vec![
663 Timestamp::new_second(0),
664 Timestamp::new_second((merge_dist as i64) * 3),
665 Timestamp::new_second((merge_dist as i64) * 3 * 2),
666 ],
667 (chrono::Duration::seconds(3), None),
668 BTreeMap::from([(
669 Timestamp::new_second(0),
670 Some(Timestamp::new_second((merge_dist as i64) * 7)),
671 )]),
672 Some(
673 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
674 ),
675 ),
676 (
678 Vec::from_iter((0..20).map(|i| Timestamp::new_second(i * 3)).chain(
679 std::iter::once(Timestamp::new_second(
680 60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1),
681 )),
682 )),
683 (chrono::Duration::seconds(3), None),
684 BTreeMap::from([
685 (Timestamp::new_second(0), Some(Timestamp::new_second(60))),
686 (
687 Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
688 Some(Timestamp::new_second(
689 60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3,
690 )),
691 ),
692 ]),
693 Some(
694 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
695 ),
696 ),
697 (
699 Vec::from_iter((0..40).map(|i| Timestamp::new_second(i * 3))),
700 (chrono::Duration::seconds(3), None),
701 BTreeMap::from([(
702 Timestamp::new_second(0),
703 Some(Timestamp::new_second(40 * 3)),
704 )]),
705 Some(
706 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
707 ),
708 ),
709 (
711 Vec::from_iter(
712 std::iter::once(Timestamp::new_second(0))
713 .chain((0..40).map(|i| Timestamp::new_second(20 + i * 3))),
714 ),
715 (chrono::Duration::seconds(3), None),
716 BTreeMap::from([
717 (Timestamp::new_second(0), Some(Timestamp::new_second(3))),
718 (Timestamp::new_second(20), Some(Timestamp::new_second(140))),
719 ]),
720 Some(
721 "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
722 ),
723 ),
724 (
726 vec![
727 Timestamp::new_second(0),
728 Timestamp::new_second((merge_dist as i64) * 5 * 60),
729 ],
730 (
731 chrono::Duration::seconds(5 * 60),
732 Some(Timestamp::new_second((merge_dist as i64) * 6 * 60)),
733 ),
734 BTreeMap::from([]),
735 None,
736 ),
737 ];
738 for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
741 testcases
742 {
743 let mut dirty = DirtyTimeWindows::default();
744 dirty.add_lower_bounds(lower_bounds.into_iter());
745 dirty
746 .merge_dirty_time_windows(window_size, expire_lower_bound)
747 .unwrap();
748 assert_eq!(expected, dirty.windows);
749 let filter_expr = dirty
750 .gen_filter_exprs(
751 "ts",
752 expire_lower_bound,
753 window_size,
754 dirty.max_filter_num_per_query,
755 0,
756 None,
757 )
758 .unwrap()
759 .map(|e| e.expr);
760
761 let unparser = datafusion::sql::unparser::Unparser::default();
762 let to_sql = filter_expr
763 .as_ref()
764 .map(|e| unparser.expr_to_sql(e).unwrap().to_string());
765 assert_eq!(expected_filter_expr, to_sql.as_deref());
766 }
767 }
768
769 #[tokio::test]
770 async fn test_align_time_window() {
771 type TimeWindow = (Timestamp, Option<Timestamp>);
772 struct TestCase {
773 sql: String,
774 aligns: Vec<(TimeWindow, TimeWindow)>,
775 }
776 let testcases: Vec<TestCase> = vec![TestCase{
777 sql: "SELECT date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window;".to_string(),
778 aligns: vec![
779 ((Timestamp::new_second(3), None), (Timestamp::new_second(0), None)),
780 ((Timestamp::new_second(8), None), (Timestamp::new_second(5), None)),
781 ((Timestamp::new_second(8), Some(Timestamp::new_second(10))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
782 ((Timestamp::new_second(8), Some(Timestamp::new_second(9))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
783 ],
784 }];
785
786 let query_engine = create_test_query_engine();
787 let ctx = QueryContext::arc();
788 for TestCase { sql, aligns } in testcases {
789 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, true)
790 .await
791 .unwrap();
792
793 let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
794 &plan,
795 query_engine.engine_state().catalog_manager().clone(),
796 ctx.clone(),
797 )
798 .await
799 .unwrap();
800
801 let time_window_expr = time_window_expr
802 .map(|expr| {
803 TimeWindowExpr::from_expr(
804 &expr,
805 &column_name,
806 &df_schema,
807 &query_engine.engine_state().session_state(),
808 )
809 })
810 .transpose()
811 .unwrap()
812 .unwrap();
813
814 let dirty = DirtyTimeWindows::default();
815 for (before_align, expected_after_align) in aligns {
816 let after_align = dirty
817 .align_time_window(before_align.0, before_align.1, &time_window_expr)
818 .unwrap();
819 assert_eq!(expected_after_align, after_align);
820 }
821 }
822 }
823}