flow/batching_mode/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Batching mode task state, which changes frequently
16
17use std::collections::BTreeMap;
18use std::time::Duration;
19
20use common_telemetry::debug;
21use common_telemetry::tracing::warn;
22use common_time::Timestamp;
23use datatypes::value::Value;
24use session::context::QueryContextRef;
25use snafu::{ensure, OptionExt, ResultExt};
26use tokio::sync::oneshot;
27use tokio::time::Instant;
28
29use crate::batching_mode::task::BatchingTask;
30use crate::batching_mode::time_window::TimeWindowExpr;
31use crate::batching_mode::MIN_REFRESH_DURATION;
32use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
33use crate::metrics::{
34    METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE,
35    METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE,
36};
37use crate::{Error, FlowId};
38
39/// The state of the [`BatchingTask`].
40#[derive(Debug)]
41pub struct TaskState {
42    /// Query context
43    pub(crate) query_ctx: QueryContextRef,
44    /// last query complete time
45    last_update_time: Instant,
46    /// last time query duration
47    last_query_duration: Duration,
48    /// Dirty Time windows need to be updated
49    /// mapping of `start -> end` and non-overlapping
50    pub(crate) dirty_time_windows: DirtyTimeWindows,
51    exec_state: ExecState,
52    /// Shutdown receiver
53    pub(crate) shutdown_rx: oneshot::Receiver<()>,
54    /// Task handle
55    pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
56}
57impl TaskState {
58    pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
59        Self {
60            query_ctx,
61            last_update_time: Instant::now(),
62            last_query_duration: Duration::from_secs(0),
63            dirty_time_windows: Default::default(),
64            exec_state: ExecState::Idle,
65            shutdown_rx,
66            task_handle: None,
67        }
68    }
69
70    /// called after last query is done
71    /// `is_succ` indicate whether the last query is successful
72    pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) {
73        self.exec_state = ExecState::Idle;
74        self.last_query_duration = elapsed;
75        self.last_update_time = Instant::now();
76    }
77
78    /// Compute the next query delay based on the time window size or the last query duration.
79    /// Aiming to avoid too frequent queries. But also not too long delay.
80    ///
81    /// next wait time is calculated as:
82    /// last query duration, capped by [max(min_run_interval, time_window_size), max_timeout],
83    /// note at most wait for `max_timeout`.
84    ///
85    /// if current the dirty time range is longer than one query can handle,
86    /// execute immediately to faster clean up dirty time windows.
87    ///
88    pub fn get_next_start_query_time(
89        &self,
90        flow_id: FlowId,
91        time_window_size: &Option<Duration>,
92        max_timeout: Option<Duration>,
93    ) -> Instant {
94        // = last query duration, capped by [max(min_run_interval, time_window_size), max_timeout], note at most `max_timeout`
95        let lower = time_window_size.unwrap_or(MIN_REFRESH_DURATION);
96        let next_duration = self.last_query_duration.max(lower);
97        let next_duration = if let Some(max_timeout) = max_timeout {
98            next_duration.min(max_timeout)
99        } else {
100            next_duration
101        };
102
103        let cur_dirty_window_size = self.dirty_time_windows.window_size();
104        // compute how much time range can be handled in one query
105        let max_query_update_range = (*time_window_size)
106            .unwrap_or_default()
107            .mul_f64(DirtyTimeWindows::MAX_FILTER_NUM as f64);
108        // if dirty time range is more than one query can handle, execute immediately
109        // to faster clean up dirty time windows
110        if cur_dirty_window_size < max_query_update_range {
111            self.last_update_time + next_duration
112        } else {
113            // if dirty time windows can't be clean up in one query, execute immediately to faster
114            // clean up dirty time windows
115            debug!(
116                "Flow id = {}, still have too many {} dirty time window({:?}), execute immediately",
117                flow_id,
118                self.dirty_time_windows.windows.len(),
119                self.dirty_time_windows.windows
120            );
121            Instant::now()
122        }
123    }
124}
125
126/// For keep recording of dirty time windows, which is time window that have new data inserted
127/// since last query.
128#[derive(Debug, Clone, Default)]
129pub struct DirtyTimeWindows {
130    /// windows's `start -> end` and non-overlapping
131    /// `end` is exclusive(and optional)
132    windows: BTreeMap<Timestamp, Option<Timestamp>>,
133}
134
135impl DirtyTimeWindows {
136    /// Time window merge distance
137    ///
138    /// TODO(discord9): make those configurable
139    pub const MERGE_DIST: i32 = 3;
140
141    /// Maximum number of filters allowed in a single query
142    pub const MAX_FILTER_NUM: usize = 20;
143
144    /// Add lower bounds to the dirty time windows. Upper bounds are ignored.
145    ///
146    /// # Arguments
147    ///
148    /// * `lower_bounds` - An iterator of lower bounds to be added.
149    pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
150        for lower_bound in lower_bounds {
151            let entry = self.windows.entry(lower_bound);
152            entry.or_insert(None);
153        }
154    }
155
156    pub fn window_size(&self) -> Duration {
157        let mut ret = Duration::from_secs(0);
158        for (start, end) in &self.windows {
159            if let Some(end) = end {
160                if let Some(duration) = end.sub(start) {
161                    ret += duration.to_std().unwrap_or_default();
162                }
163            }
164        }
165        ret
166    }
167
168    pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
169        self.windows.insert(start, end);
170    }
171
172    /// Clean all dirty time windows, useful when can't found time window expr
173    pub fn clean(&mut self) {
174        self.windows.clear();
175    }
176
177    /// Number of dirty windows.
178    pub fn len(&self) -> usize {
179        self.windows.len()
180    }
181
182    /// Get the effective count of time windows, which is the number of time windows that can be
183    /// used for query, compute from total time window range divided by `window_size`.
184    pub fn effective_count(&self, window_size: &Duration) -> usize {
185        if self.windows.is_empty() {
186            return 0;
187        }
188        let window_size =
189            chrono::Duration::from_std(*window_size).unwrap_or(chrono::Duration::zero());
190        let total_window_time_range =
191            self.windows
192                .iter()
193                .fold(chrono::Duration::zero(), |acc, (start, end)| {
194                    if let Some(end) = end {
195                        acc + end.sub(start).unwrap_or(chrono::Duration::zero())
196                    } else {
197                        acc + window_size
198                    }
199                });
200
201        // not sure window_size is zero have any meaning, but just in case
202        if window_size.num_seconds() == 0 {
203            0
204        } else {
205            (total_window_time_range.num_seconds() / window_size.num_seconds()) as usize
206        }
207    }
208
209    /// Generate all filter expressions consuming all time windows
210    ///
211    /// there is two limits:
212    /// - shouldn't return a too long time range(<=`window_size * window_cnt`), so that the query can be executed in a reasonable time
213    /// - shouldn't return too many time range exprs, so that the query can be parsed properly instead of causing parser to overflow
214    pub fn gen_filter_exprs(
215        &mut self,
216        col_name: &str,
217        expire_lower_bound: Option<Timestamp>,
218        window_size: chrono::Duration,
219        window_cnt: usize,
220        flow_id: FlowId,
221        task_ctx: Option<&BatchingTask>,
222    ) -> Result<Option<datafusion_expr::Expr>, Error> {
223        ensure!(
224            window_size.num_seconds() > 0,
225            UnexpectedSnafu {
226                reason: "window_size is zero, can't generate filter exprs",
227            }
228        );
229
230        debug!(
231            "expire_lower_bound: {:?}, window_size: {:?}",
232            expire_lower_bound.map(|t| t.to_iso8601_string()),
233            window_size
234        );
235        self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
236
237        if self.windows.len() > Self::MAX_FILTER_NUM {
238            let first_time_window = self.windows.first_key_value();
239            let last_time_window = self.windows.last_key_value();
240
241            if let Some(task_ctx) = task_ctx {
242                warn!(
243                "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
244                task_ctx.config.flow_id,
245                self.windows.len(),
246                Self::MAX_FILTER_NUM,
247                task_ctx.config.time_window_expr,
248                task_ctx.config.expire_after,
249                first_time_window,
250                last_time_window,
251                task_ctx.config.query
252            );
253            } else {
254                warn!("Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}",
255                flow_id,
256                self.windows.len(),
257                Self::MAX_FILTER_NUM,
258                first_time_window,
259                last_time_window
260                )
261            }
262        }
263
264        // get the first `window_cnt` time windows
265        let max_time_range = window_size * window_cnt as i32;
266
267        let mut to_be_query = BTreeMap::new();
268        let mut new_windows = self.windows.clone();
269        let mut cur_time_range = chrono::Duration::zero();
270        for (idx, (start, end)) in self.windows.iter().enumerate() {
271            let first_end = start
272                .add_duration(window_size.to_std().unwrap())
273                .context(TimeSnafu)?;
274            let end = end.unwrap_or(first_end);
275
276            // if time range is too long, stop
277            if cur_time_range >= max_time_range {
278                break;
279            }
280
281            // if we have enough time windows, stop
282            if idx >= window_cnt {
283                break;
284            }
285
286            let Some(x) = end.sub(start) else {
287                continue;
288            };
289            if cur_time_range + x <= max_time_range {
290                to_be_query.insert(*start, Some(end));
291                new_windows.remove(start);
292                cur_time_range += x;
293            } else {
294                // too large a window, split it
295                // split at window_size * times
296                let surplus = max_time_range - cur_time_range;
297                if surplus.num_seconds() <= window_size.num_seconds() {
298                    // Skip splitting if surplus is smaller than window_size
299                    break;
300                }
301                let times = surplus.num_seconds() / window_size.num_seconds();
302
303                let split_offset = window_size * times as i32;
304                let split_at = start
305                    .add_duration(split_offset.to_std().unwrap())
306                    .context(TimeSnafu)?;
307                to_be_query.insert(*start, Some(split_at));
308
309                // remove the original window
310                new_windows.remove(start);
311                new_windows.insert(split_at, Some(end));
312                cur_time_range += split_offset;
313                break;
314            }
315        }
316
317        self.windows = new_windows;
318
319        METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
320            .with_label_values(&[flow_id.to_string().as_str()])
321            .observe(to_be_query.len() as f64);
322
323        let full_time_range = to_be_query
324            .iter()
325            .fold(chrono::Duration::zero(), |acc, (start, end)| {
326                if let Some(end) = end {
327                    acc + end.sub(start).unwrap_or(chrono::Duration::zero())
328                } else {
329                    acc + window_size
330                }
331            })
332            .num_seconds() as f64;
333        METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE
334            .with_label_values(&[flow_id.to_string().as_str()])
335            .observe(full_time_range);
336
337        let stalled_time_range =
338            self.windows
339                .iter()
340                .fold(chrono::Duration::zero(), |acc, (start, end)| {
341                    if let Some(end) = end {
342                        acc + end.sub(start).unwrap_or(chrono::Duration::zero())
343                    } else {
344                        acc + window_size
345                    }
346                });
347
348        METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE
349            .with_label_values(&[flow_id.to_string().as_str()])
350            .observe(stalled_time_range.num_seconds() as f64);
351
352        let mut expr_lst = vec![];
353        for (start, end) in to_be_query.into_iter() {
354            // align using time window exprs
355            let (start, end) = if let Some(ctx) = task_ctx {
356                let Some(time_window_expr) = &ctx.config.time_window_expr else {
357                    UnexpectedSnafu {
358                        reason: "time_window_expr is not set",
359                    }
360                    .fail()?
361                };
362                self.align_time_window(start, end, time_window_expr)?
363            } else {
364                (start, end)
365            };
366            debug!(
367                "Time window start: {:?}, end: {:?}",
368                start.to_iso8601_string(),
369                end.map(|t| t.to_iso8601_string())
370            );
371
372            use datafusion_expr::{col, lit};
373            let lower = to_df_literal(start)?;
374            let upper = end.map(to_df_literal).transpose()?;
375            let expr = if let Some(upper) = upper {
376                col(col_name)
377                    .gt_eq(lit(lower))
378                    .and(col(col_name).lt(lit(upper)))
379            } else {
380                col(col_name).gt_eq(lit(lower))
381            };
382            expr_lst.push(expr);
383        }
384        let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
385        Ok(expr)
386    }
387
388    fn align_time_window(
389        &self,
390        start: Timestamp,
391        end: Option<Timestamp>,
392        time_window_expr: &TimeWindowExpr,
393    ) -> Result<(Timestamp, Option<Timestamp>), Error> {
394        let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
395            reason: format!(
396                "Failed to align start time {:?} with time window expr {:?}",
397                start, time_window_expr
398            ),
399        })?;
400        let align_end = end
401            .and_then(|end| {
402                time_window_expr
403                    .eval(end)
404                    // if after aligned, end is the same, then use end(because it's already aligned) else use aligned end
405                    .map(|r| if r.0 == Some(end) { r.0 } else { r.1 })
406                    .transpose()
407            })
408            .transpose()?;
409        Ok((align_start, align_end))
410    }
411
412    /// Merge time windows that overlaps or get too close
413    ///
414    /// TODO(discord9): not merge and prefer to send smaller time windows? how?
415    pub fn merge_dirty_time_windows(
416        &mut self,
417        window_size: chrono::Duration,
418        expire_lower_bound: Option<Timestamp>,
419    ) -> Result<(), Error> {
420        if self.windows.is_empty() {
421            return Ok(());
422        }
423
424        let mut new_windows = BTreeMap::new();
425
426        let std_window_size = window_size.to_std().map_err(|e| {
427            InternalSnafu {
428                reason: e.to_string(),
429            }
430            .build()
431        })?;
432
433        // previous time window
434        let mut prev_tw = None;
435        for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
436            // filter out expired time window
437            if let Some(expire_lower_bound) = expire_lower_bound {
438                if lower_bound < expire_lower_bound {
439                    continue;
440                }
441            }
442
443            let Some(prev_tw) = &mut prev_tw else {
444                prev_tw = Some((lower_bound, upper_bound));
445                continue;
446            };
447
448            // if cur.lower - prev.upper <= window_size * MERGE_DIST, merge
449            // this also deal with overlap windows because cur.lower > prev.lower is always true
450            let prev_upper = prev_tw
451                .1
452                .unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
453            prev_tw.1 = Some(prev_upper);
454
455            let cur_upper = upper_bound.unwrap_or(
456                lower_bound
457                    .add_duration(std_window_size)
458                    .context(TimeSnafu)?,
459            );
460
461            if lower_bound
462                .sub(&prev_upper)
463                .map(|dist| dist <= window_size * Self::MERGE_DIST)
464                .unwrap_or(false)
465            {
466                prev_tw.1 = Some(cur_upper);
467            } else {
468                new_windows.insert(prev_tw.0, prev_tw.1);
469                *prev_tw = (lower_bound, Some(cur_upper));
470            }
471        }
472
473        if let Some(prev_tw) = prev_tw {
474            new_windows.insert(prev_tw.0, prev_tw.1);
475        }
476
477        self.windows = new_windows;
478
479        Ok(())
480    }
481}
482
483fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
484    let value = Value::from(value);
485    let value = value
486        .try_to_scalar_value(&value.data_type())
487        .with_context(|_| DatatypesSnafu {
488            extra: format!("Failed to convert to scalar value: {}", value),
489        })?;
490    Ok(value)
491}
492
493#[derive(Debug, Clone)]
494enum ExecState {
495    Idle,
496    Executing,
497}
498
499#[cfg(test)]
500mod test {
501    use pretty_assertions::assert_eq;
502    use session::context::QueryContext;
503
504    use super::*;
505    use crate::batching_mode::time_window::find_time_window_expr;
506    use crate::batching_mode::utils::sql_to_df_plan;
507    use crate::test_utils::create_test_query_engine;
508
509    #[test]
510    fn test_merge_dirty_time_windows() {
511        let testcases = vec![
512            // just enough to merge
513            (
514                vec![
515                    Timestamp::new_second(0),
516                    Timestamp::new_second((1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
517                ],
518                (chrono::Duration::seconds(5 * 60), None),
519                BTreeMap::from([(
520                    Timestamp::new_second(0),
521                    Some(Timestamp::new_second(
522                        (2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
523                    )),
524                )]),
525                Some(
526                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:25:00' AS TIMESTAMP)))",
527                )
528            ),
529            // separate time window
530            (
531                vec![
532                    Timestamp::new_second(0),
533                    Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
534                ],
535                (chrono::Duration::seconds(5 * 60), None),
536                BTreeMap::from([
537                    (
538                        Timestamp::new_second(0),
539                        Some(Timestamp::new_second(5 * 60)),
540                    ),
541                    (
542                        Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
543                        Some(Timestamp::new_second(
544                            (3 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
545                        )),
546                    ),
547                ]),
548                Some(
549                    "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:05:00' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:25:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:30:00' AS TIMESTAMP))))",
550                )
551            ),
552            // overlapping
553            (
554                vec![
555                    Timestamp::new_second(0),
556                    Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
557                ],
558                (chrono::Duration::seconds(5 * 60), None),
559                BTreeMap::from([(
560                    Timestamp::new_second(0),
561                    Some(Timestamp::new_second(
562                        (1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
563                    )),
564                )]),
565                Some(
566                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:20:00' AS TIMESTAMP)))",
567                )
568            ),
569            // complex overlapping
570            (
571                vec![
572                    Timestamp::new_second(0),
573                    Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3),
574                    Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3 * 2),
575                ],
576                (chrono::Duration::seconds(3), None),
577                BTreeMap::from([(
578                    Timestamp::new_second(0),
579                    Some(Timestamp::new_second(
580                        (DirtyTimeWindows::MERGE_DIST as i64) * 7
581                    )),
582                )]),
583                Some(
584                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
585                )
586            ),
587            // split range
588            (
589                Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once(
590                    Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
591                ))),
592                (chrono::Duration::seconds(3), None),
593                BTreeMap::from([
594                (
595                    Timestamp::new_second(0),
596                    Some(Timestamp::new_second(
597                        60
598                    )),
599                ),
600                (
601                    Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
602                    Some(Timestamp::new_second(
603                        60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3
604                    )),
605                )]),
606                Some(
607                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
608                )
609            ),
610            // split 2 min into 1 min
611            (
612                Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))),
613                (chrono::Duration::seconds(3), None),
614                BTreeMap::from([
615                (
616                    Timestamp::new_second(0),
617                    Some(Timestamp::new_second(
618                        40 * 3
619                    )),
620                )]),
621                Some(
622                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
623                )
624            ),
625            // split 3s + 1min into 3s + 57s
626            (
627                Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))),
628                (chrono::Duration::seconds(3), None),
629                BTreeMap::from([
630                (
631                    Timestamp::new_second(0),
632                    Some(Timestamp::new_second(
633                        3
634                    )),
635                ),(
636                    Timestamp::new_second(20),
637                    Some(Timestamp::new_second(
638                        140
639                    )),
640                )]),
641                Some(
642                    "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
643                )
644            ),
645            // expired
646            (
647                vec![
648                    Timestamp::new_second(0),
649                    Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
650                ],
651                (
652                    chrono::Duration::seconds(5 * 60),
653                    Some(Timestamp::new_second(
654                        (DirtyTimeWindows::MERGE_DIST as i64) * 6 * 60,
655                    )),
656                ),
657                BTreeMap::from([]),
658                None
659            ),
660        ];
661        // let len = testcases.len();
662        // let testcases = testcases[(len - 2)..(len - 1)].to_vec();
663        for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
664            testcases
665        {
666            let mut dirty = DirtyTimeWindows::default();
667            dirty.add_lower_bounds(lower_bounds.into_iter());
668            dirty
669                .merge_dirty_time_windows(window_size, expire_lower_bound)
670                .unwrap();
671            assert_eq!(expected, dirty.windows);
672            let filter_expr = dirty
673                .gen_filter_exprs(
674                    "ts",
675                    expire_lower_bound,
676                    window_size,
677                    DirtyTimeWindows::MAX_FILTER_NUM,
678                    0,
679                    None,
680                )
681                .unwrap();
682
683            let unparser = datafusion::sql::unparser::Unparser::default();
684            let to_sql = filter_expr
685                .as_ref()
686                .map(|e| unparser.expr_to_sql(e).unwrap().to_string());
687            assert_eq!(expected_filter_expr, to_sql.as_deref());
688        }
689    }
690
691    #[tokio::test]
692    async fn test_align_time_window() {
693        type TimeWindow = (Timestamp, Option<Timestamp>);
694        struct TestCase {
695            sql: String,
696            aligns: Vec<(TimeWindow, TimeWindow)>,
697        }
698        let testcases: Vec<TestCase> = vec![TestCase{
699            sql: "SELECT date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window;".to_string(),
700            aligns: vec![
701                ((Timestamp::new_second(3), None), (Timestamp::new_second(0), None)),
702                ((Timestamp::new_second(8), None), (Timestamp::new_second(5), None)),
703                ((Timestamp::new_second(8), Some(Timestamp::new_second(10))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
704                ((Timestamp::new_second(8), Some(Timestamp::new_second(9))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
705            ],
706        }];
707
708        let query_engine = create_test_query_engine();
709        let ctx = QueryContext::arc();
710        for TestCase { sql, aligns } in testcases {
711            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, true)
712                .await
713                .unwrap();
714
715            let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
716                &plan,
717                query_engine.engine_state().catalog_manager().clone(),
718                ctx.clone(),
719            )
720            .await
721            .unwrap();
722
723            let time_window_expr = time_window_expr
724                .map(|expr| {
725                    TimeWindowExpr::from_expr(
726                        &expr,
727                        &column_name,
728                        &df_schema,
729                        &query_engine.engine_state().session_state(),
730                    )
731                })
732                .transpose()
733                .unwrap()
734                .unwrap();
735
736            let dirty = DirtyTimeWindows::default();
737            for (before_align, expected_after_align) in aligns {
738                let after_align = dirty
739                    .align_time_window(before_align.0, before_align.1, &time_window_expr)
740                    .unwrap();
741                assert_eq!(expected_after_align, after_align);
742            }
743        }
744    }
745}