flow/batching_mode/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Batching mode task state, which changes frequently
16
17use std::collections::BTreeMap;
18use std::time::Duration;
19
20use common_telemetry::debug;
21use common_telemetry::tracing::warn;
22use common_time::Timestamp;
23use datatypes::value::Value;
24use session::context::QueryContextRef;
25use snafu::{OptionExt, ResultExt, ensure};
26use tokio::sync::oneshot;
27use tokio::time::Instant;
28
29use crate::batching_mode::task::BatchingTask;
30use crate::batching_mode::time_window::TimeWindowExpr;
31use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
32use crate::metrics::{
33    METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE,
34    METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE,
35};
36use crate::{Error, FlowId};
37
38/// The state of the [`BatchingTask`].
39#[derive(Debug)]
40pub struct TaskState {
41    /// Query context
42    pub(crate) query_ctx: QueryContextRef,
43    /// last query complete time
44    last_update_time: Instant,
45    /// last time query duration
46    last_query_duration: Duration,
47    /// Last successful execution time in unix timestamp milliseconds.
48    last_exec_time_millis: Option<i64>,
49    /// Dirty Time windows need to be updated
50    /// mapping of `start -> end` and non-overlapping
51    pub(crate) dirty_time_windows: DirtyTimeWindows,
52    exec_state: ExecState,
53    /// Shutdown receiver
54    pub(crate) shutdown_rx: oneshot::Receiver<()>,
55    /// Task handle
56    pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
57}
58impl TaskState {
59    pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
60        Self {
61            query_ctx,
62            last_update_time: Instant::now(),
63            last_query_duration: Duration::from_secs(0),
64            last_exec_time_millis: None,
65            dirty_time_windows: Default::default(),
66            exec_state: ExecState::Idle,
67            shutdown_rx,
68            task_handle: None,
69        }
70    }
71
72    /// called after last query is done
73    /// `is_succ` indicate whether the last query is successful
74    pub fn after_query_exec(&mut self, elapsed: Duration, is_succ: bool) {
75        self.exec_state = ExecState::Idle;
76        self.last_query_duration = elapsed;
77        self.last_update_time = Instant::now();
78        if is_succ {
79            self.last_exec_time_millis = Some(common_time::util::current_time_millis());
80        }
81    }
82
83    pub fn last_execution_time_millis(&self) -> Option<i64> {
84        self.last_exec_time_millis
85    }
86
87    /// Compute the next query delay based on the time window size or the last query duration.
88    /// Aiming to avoid too frequent queries. But also not too long delay.
89    ///
90    /// next wait time is calculated as:
91    /// last query duration, capped by [max(min_run_interval, time_window_size), max_timeout],
92    /// note at most wait for `max_timeout`.
93    ///
94    /// if current the dirty time range is longer than one query can handle,
95    /// execute immediately to faster clean up dirty time windows.
96    ///
97    pub fn get_next_start_query_time(
98        &self,
99        flow_id: FlowId,
100        time_window_size: &Option<Duration>,
101        min_refresh_duration: Duration,
102        max_timeout: Option<Duration>,
103        max_filter_num_per_query: usize,
104    ) -> Instant {
105        // = last query duration, capped by [max(min_run_interval, time_window_size), max_timeout], note at most `max_timeout`
106        let lower = time_window_size.unwrap_or(min_refresh_duration);
107        let next_duration = self.last_query_duration.max(lower);
108        let next_duration = if let Some(max_timeout) = max_timeout {
109            next_duration.min(max_timeout)
110        } else {
111            next_duration
112        };
113
114        let cur_dirty_window_size = self.dirty_time_windows.window_size();
115        // compute how much time range can be handled in one query
116        let max_query_update_range = (*time_window_size)
117            .unwrap_or_default()
118            .mul_f64(max_filter_num_per_query as f64);
119        // if dirty time range is more than one query can handle, execute immediately
120        // to faster clean up dirty time windows
121        if cur_dirty_window_size < max_query_update_range {
122            self.last_update_time + next_duration
123        } else {
124            // if dirty time windows can't be clean up in one query, execute immediately to faster
125            // clean up dirty time windows
126            debug!(
127                "Flow id = {}, still have too many {} dirty time window({:?}), execute immediately",
128                flow_id,
129                self.dirty_time_windows.windows.len(),
130                self.dirty_time_windows.windows
131            );
132            Instant::now()
133        }
134    }
135}
136
137/// For keep recording of dirty time windows, which is time window that have new data inserted
138/// since last query.
139#[derive(Debug, Clone)]
140pub struct DirtyTimeWindows {
141    /// windows's `start -> end` and non-overlapping
142    /// `end` is exclusive(and optional)
143    windows: BTreeMap<Timestamp, Option<Timestamp>>,
144    /// Maximum number of filters allowed in a single query
145    max_filter_num_per_query: usize,
146    /// Time window merge distance
147    ///
148    time_window_merge_threshold: usize,
149}
150
151impl DirtyTimeWindows {
152    pub fn new(max_filter_num_per_query: usize, time_window_merge_threshold: usize) -> Self {
153        Self {
154            windows: BTreeMap::new(),
155            max_filter_num_per_query,
156            time_window_merge_threshold,
157        }
158    }
159}
160
161impl Default for DirtyTimeWindows {
162    fn default() -> Self {
163        Self {
164            windows: BTreeMap::new(),
165            max_filter_num_per_query: 20,
166            time_window_merge_threshold: 3,
167        }
168    }
169}
170
171impl DirtyTimeWindows {
172    /// Time window merge distance
173    ///
174    /// TODO(discord9): make those configurable
175    pub const MERGE_DIST: i32 = 3;
176
177    /// Add lower bounds to the dirty time windows. Upper bounds are ignored.
178    ///
179    /// # Arguments
180    ///
181    /// * `lower_bounds` - An iterator of lower bounds to be added.
182    pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
183        for lower_bound in lower_bounds {
184            let entry = self.windows.entry(lower_bound);
185            entry.or_insert(None);
186        }
187    }
188
189    pub fn window_size(&self) -> Duration {
190        let mut ret = Duration::from_secs(0);
191        for (start, end) in &self.windows {
192            if let Some(end) = end
193                && let Some(duration) = end.sub(start)
194            {
195                ret += duration.to_std().unwrap_or_default();
196            }
197        }
198        ret
199    }
200
201    pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
202        self.windows.insert(start, end);
203    }
204
205    pub fn add_windows(&mut self, time_ranges: Vec<(Timestamp, Timestamp)>) {
206        for (start, end) in time_ranges {
207            self.windows.insert(start, Some(end));
208        }
209    }
210
211    /// Clean all dirty time windows, useful when can't found time window expr
212    pub fn clean(&mut self) {
213        self.windows.clear();
214    }
215
216    /// Set windows to be dirty, only useful for full aggr without time window
217    /// to mark some new data is inserted
218    pub fn set_dirty(&mut self) {
219        self.windows.insert(Timestamp::new_second(0), None);
220    }
221
222    /// Number of dirty windows.
223    pub fn len(&self) -> usize {
224        self.windows.len()
225    }
226
227    pub fn is_empty(&self) -> bool {
228        self.windows.is_empty()
229    }
230
231    /// Get the effective count of time windows, which is the number of time windows that can be
232    /// used for query, compute from total time window range divided by `window_size`.
233    pub fn effective_count(&self, window_size: &Duration) -> usize {
234        if self.windows.is_empty() {
235            return 0;
236        }
237        let window_size =
238            chrono::Duration::from_std(*window_size).unwrap_or(chrono::Duration::zero());
239        let total_window_time_range =
240            self.windows
241                .iter()
242                .fold(chrono::Duration::zero(), |acc, (start, end)| {
243                    if let Some(end) = end {
244                        acc + end.sub(start).unwrap_or(chrono::Duration::zero())
245                    } else {
246                        acc + window_size
247                    }
248                });
249
250        // not sure window_size is zero have any meaning, but just in case
251        if window_size.num_seconds() == 0 {
252            0
253        } else {
254            (total_window_time_range.num_seconds() / window_size.num_seconds()) as usize
255        }
256    }
257
258    /// Generate all filter expressions consuming all time windows
259    ///
260    /// there is two limits:
261    /// - shouldn't return a too long time range(<=`window_size * window_cnt`), so that the query can be executed in a reasonable time
262    /// - shouldn't return too many time range exprs, so that the query can be parsed properly instead of causing parser to overflow
263    pub fn gen_filter_exprs(
264        &mut self,
265        col_name: &str,
266        expire_lower_bound: Option<Timestamp>,
267        window_size: chrono::Duration,
268        window_cnt: usize,
269        flow_id: FlowId,
270        task_ctx: Option<&BatchingTask>,
271    ) -> Result<Option<FilterExprInfo>, Error> {
272        ensure!(
273            window_size.num_seconds() > 0,
274            UnexpectedSnafu {
275                reason: "window_size is zero, can't generate filter exprs",
276            }
277        );
278
279        debug!(
280            "expire_lower_bound: {:?}, window_size: {:?}",
281            expire_lower_bound.map(|t| t.to_iso8601_string()),
282            window_size
283        );
284        self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
285
286        if self.windows.len() > self.max_filter_num_per_query {
287            let first_time_window = self.windows.first_key_value();
288            let last_time_window = self.windows.last_key_value();
289
290            if let Some(task_ctx) = task_ctx {
291                warn!(
292                    "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
293                    task_ctx.config.flow_id,
294                    self.windows.len(),
295                    self.max_filter_num_per_query,
296                    task_ctx.config.time_window_expr,
297                    task_ctx.config.expire_after,
298                    first_time_window,
299                    last_time_window,
300                    task_ctx.config.query
301                );
302            } else {
303                warn!(
304                    "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}",
305                    flow_id,
306                    self.windows.len(),
307                    self.max_filter_num_per_query,
308                    first_time_window,
309                    last_time_window
310                )
311            }
312        }
313
314        // get the first `window_cnt` time windows
315        let max_time_range = window_size * window_cnt as i32;
316
317        let mut to_be_query = BTreeMap::new();
318        let mut new_windows = self.windows.clone();
319        let mut cur_time_range = chrono::Duration::zero();
320        for (idx, (start, end)) in self.windows.iter().enumerate() {
321            let first_end = start
322                .add_duration(window_size.to_std().unwrap())
323                .context(TimeSnafu)?;
324            let end = end.unwrap_or(first_end);
325
326            // if time range is too long, stop
327            if cur_time_range >= max_time_range {
328                break;
329            }
330
331            // if we have enough time windows, stop
332            if idx >= window_cnt {
333                break;
334            }
335
336            let Some(x) = end.sub(start) else {
337                continue;
338            };
339            if cur_time_range + x <= max_time_range {
340                to_be_query.insert(*start, Some(end));
341                new_windows.remove(start);
342                cur_time_range += x;
343            } else {
344                // too large a window, split it
345                // split at window_size * times
346                let surplus = max_time_range - cur_time_range;
347                if surplus.num_seconds() <= window_size.num_seconds() {
348                    // Skip splitting if surplus is smaller than window_size
349                    break;
350                }
351                let times = surplus.num_seconds() / window_size.num_seconds();
352
353                let split_offset = window_size * times as i32;
354                let split_at = start
355                    .add_duration(split_offset.to_std().unwrap())
356                    .context(TimeSnafu)?;
357                to_be_query.insert(*start, Some(split_at));
358
359                // remove the original window
360                new_windows.remove(start);
361                new_windows.insert(split_at, Some(end));
362                cur_time_range += split_offset;
363                break;
364            }
365        }
366
367        self.windows = new_windows;
368
369        METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
370            .with_label_values(&[flow_id.to_string().as_str()])
371            .observe(to_be_query.len() as f64);
372
373        let full_time_range = to_be_query
374            .iter()
375            .fold(chrono::Duration::zero(), |acc, (start, end)| {
376                if let Some(end) = end {
377                    acc + end.sub(start).unwrap_or(chrono::Duration::zero())
378                } else {
379                    acc + window_size
380                }
381            })
382            .num_seconds() as f64;
383        METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE
384            .with_label_values(&[flow_id.to_string().as_str()])
385            .observe(full_time_range);
386
387        let stalled_time_range =
388            self.windows
389                .iter()
390                .fold(chrono::Duration::zero(), |acc, (start, end)| {
391                    if let Some(end) = end {
392                        acc + end.sub(start).unwrap_or(chrono::Duration::zero())
393                    } else {
394                        acc + window_size
395                    }
396                });
397
398        METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE
399            .with_label_values(&[flow_id.to_string().as_str()])
400            .observe(stalled_time_range.num_seconds() as f64);
401
402        let std_window_size = window_size.to_std().map_err(|e| {
403            InternalSnafu {
404                reason: e.to_string(),
405            }
406            .build()
407        })?;
408
409        let mut expr_lst = vec![];
410        let mut time_ranges = vec![];
411        for (start, end) in to_be_query.into_iter() {
412            // align using time window exprs
413            let (start, end) = if let Some(ctx) = task_ctx {
414                let Some(time_window_expr) = &ctx.config.time_window_expr else {
415                    UnexpectedSnafu {
416                        reason: "time_window_expr is not set",
417                    }
418                    .fail()?
419                };
420                self.align_time_window(start, end, time_window_expr)?
421            } else {
422                (start, end)
423            };
424            let end = end.unwrap_or(start.add_duration(std_window_size).context(TimeSnafu)?);
425            time_ranges.push((start, end));
426
427            debug!(
428                "Time window start: {:?}, end: {:?}",
429                start.to_iso8601_string(),
430                end.to_iso8601_string()
431            );
432
433            use datafusion_expr::{col, lit};
434            let lower = to_df_literal(start)?;
435            let upper = to_df_literal(end)?;
436            let expr = col(col_name)
437                .gt_eq(lit(lower))
438                .and(col(col_name).lt(lit(upper)));
439            expr_lst.push(expr);
440        }
441        let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
442        let ret = expr.map(|expr| FilterExprInfo {
443            expr,
444            col_name: col_name.to_string(),
445            time_ranges,
446            window_size,
447        });
448        Ok(ret)
449    }
450
451    fn align_time_window(
452        &self,
453        start: Timestamp,
454        end: Option<Timestamp>,
455        time_window_expr: &TimeWindowExpr,
456    ) -> Result<(Timestamp, Option<Timestamp>), Error> {
457        let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
458            reason: format!(
459                "Failed to align start time {:?} with time window expr {:?}",
460                start, time_window_expr
461            ),
462        })?;
463        let align_end = end
464            .and_then(|end| {
465                time_window_expr
466                    .eval(end)
467                    // if after aligned, end is the same, then use end(because it's already aligned) else use aligned end
468                    .map(|r| if r.0 == Some(end) { r.0 } else { r.1 })
469                    .transpose()
470            })
471            .transpose()?;
472        Ok((align_start, align_end))
473    }
474
475    /// Merge time windows that overlaps or get too close
476    ///
477    /// TODO(discord9): not merge and prefer to send smaller time windows? how?
478    pub fn merge_dirty_time_windows(
479        &mut self,
480        window_size: chrono::Duration,
481        expire_lower_bound: Option<Timestamp>,
482    ) -> Result<(), Error> {
483        if self.windows.is_empty() {
484            return Ok(());
485        }
486
487        let mut new_windows = BTreeMap::new();
488
489        let std_window_size = window_size.to_std().map_err(|e| {
490            InternalSnafu {
491                reason: e.to_string(),
492            }
493            .build()
494        })?;
495
496        // previous time window
497        let mut prev_tw = None;
498        for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
499            // filter out expired time window
500            if let Some(expire_lower_bound) = expire_lower_bound
501                && lower_bound < expire_lower_bound
502            {
503                continue;
504            }
505
506            let Some(prev_tw) = &mut prev_tw else {
507                prev_tw = Some((lower_bound, upper_bound));
508                continue;
509            };
510
511            // if cur.lower - prev.upper <= window_size * MERGE_DIST, merge
512            // this also deal with overlap windows because cur.lower > prev.lower is always true
513            let prev_upper = prev_tw
514                .1
515                .unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
516            prev_tw.1 = Some(prev_upper);
517
518            let cur_upper = upper_bound.unwrap_or(
519                lower_bound
520                    .add_duration(std_window_size)
521                    .context(TimeSnafu)?,
522            );
523
524            if lower_bound
525                .sub(&prev_upper)
526                .map(|dist| dist <= window_size * self.time_window_merge_threshold as i32)
527                .unwrap_or(false)
528            {
529                prev_tw.1 = Some(cur_upper);
530            } else {
531                new_windows.insert(prev_tw.0, prev_tw.1);
532                *prev_tw = (lower_bound, Some(cur_upper));
533            }
534        }
535
536        if let Some(prev_tw) = prev_tw {
537            new_windows.insert(prev_tw.0, prev_tw.1);
538        }
539
540        self.windows = new_windows;
541
542        Ok(())
543    }
544}
545
546fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
547    let value = Value::from(value);
548    let value = value
549        .try_to_scalar_value(&value.data_type())
550        .with_context(|_| DatatypesSnafu {
551            extra: format!("Failed to convert to scalar value: {}", value),
552        })?;
553    Ok(value)
554}
555
556#[derive(Debug, Clone)]
557enum ExecState {
558    Idle,
559    Executing,
560}
561
562/// Filter Expression's information
563#[derive(Debug, Clone)]
564pub struct FilterExprInfo {
565    pub expr: datafusion_expr::Expr,
566    pub col_name: String,
567    pub time_ranges: Vec<(Timestamp, Timestamp)>,
568    pub window_size: chrono::Duration,
569}
570
571impl FilterExprInfo {
572    pub fn total_window_length(&self) -> chrono::Duration {
573        self.time_ranges
574            .iter()
575            .fold(chrono::Duration::zero(), |acc, (start, end)| {
576                acc + end.sub(start).unwrap_or(chrono::Duration::zero())
577            })
578    }
579}
580
581#[cfg(test)]
582mod test {
583    use pretty_assertions::assert_eq;
584    use session::context::QueryContext;
585
586    use super::*;
587    use crate::batching_mode::time_window::find_time_window_expr;
588    use crate::batching_mode::utils::sql_to_df_plan;
589    use crate::test_utils::create_test_query_engine;
590
591    #[test]
592    fn test_task_state_records_last_execution_time() {
593        let query_ctx = QueryContext::arc();
594        let (_tx, rx) = tokio::sync::oneshot::channel();
595        let mut state = TaskState::new(query_ctx, rx);
596
597        assert_eq!(None, state.last_execution_time_millis());
598        state.after_query_exec(std::time::Duration::from_millis(1), false);
599        assert_eq!(None, state.last_execution_time_millis());
600
601        state.after_query_exec(std::time::Duration::from_millis(1), true);
602        assert!(state.last_execution_time_millis().is_some());
603    }
604
605    #[test]
606    fn test_merge_dirty_time_windows() {
607        let merge_dist = DirtyTimeWindows::default().time_window_merge_threshold;
608        let testcases = vec![
609            // just enough to merge
610            (
611                vec![
612                    Timestamp::new_second(0),
613                    Timestamp::new_second((1 + merge_dist as i64) * 5 * 60),
614                ],
615                (chrono::Duration::seconds(5 * 60), None),
616                BTreeMap::from([(
617                    Timestamp::new_second(0),
618                    Some(Timestamp::new_second((2 + merge_dist as i64) * 5 * 60)),
619                )]),
620                Some(
621                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:25:00' AS TIMESTAMP)))",
622                ),
623            ),
624            // separate time window
625            (
626                vec![
627                    Timestamp::new_second(0),
628                    Timestamp::new_second((2 + merge_dist as i64) * 5 * 60),
629                ],
630                (chrono::Duration::seconds(5 * 60), None),
631                BTreeMap::from([
632                    (
633                        Timestamp::new_second(0),
634                        Some(Timestamp::new_second(5 * 60)),
635                    ),
636                    (
637                        Timestamp::new_second((2 + merge_dist as i64) * 5 * 60),
638                        Some(Timestamp::new_second((3 + merge_dist as i64) * 5 * 60)),
639                    ),
640                ]),
641                Some(
642                    "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:05:00' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:25:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:30:00' AS TIMESTAMP))))",
643                ),
644            ),
645            // overlapping
646            (
647                vec![
648                    Timestamp::new_second(0),
649                    Timestamp::new_second((merge_dist as i64) * 5 * 60),
650                ],
651                (chrono::Duration::seconds(5 * 60), None),
652                BTreeMap::from([(
653                    Timestamp::new_second(0),
654                    Some(Timestamp::new_second((1 + merge_dist as i64) * 5 * 60)),
655                )]),
656                Some(
657                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:20:00' AS TIMESTAMP)))",
658                ),
659            ),
660            // complex overlapping
661            (
662                vec![
663                    Timestamp::new_second(0),
664                    Timestamp::new_second((merge_dist as i64) * 3),
665                    Timestamp::new_second((merge_dist as i64) * 3 * 2),
666                ],
667                (chrono::Duration::seconds(3), None),
668                BTreeMap::from([(
669                    Timestamp::new_second(0),
670                    Some(Timestamp::new_second((merge_dist as i64) * 7)),
671                )]),
672                Some(
673                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
674                ),
675            ),
676            // split range
677            (
678                Vec::from_iter((0..20).map(|i| Timestamp::new_second(i * 3)).chain(
679                    std::iter::once(Timestamp::new_second(
680                        60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1),
681                    )),
682                )),
683                (chrono::Duration::seconds(3), None),
684                BTreeMap::from([
685                    (Timestamp::new_second(0), Some(Timestamp::new_second(60))),
686                    (
687                        Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
688                        Some(Timestamp::new_second(
689                            60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3,
690                        )),
691                    ),
692                ]),
693                Some(
694                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
695                ),
696            ),
697            // split 2 min into 1 min
698            (
699                Vec::from_iter((0..40).map(|i| Timestamp::new_second(i * 3))),
700                (chrono::Duration::seconds(3), None),
701                BTreeMap::from([(
702                    Timestamp::new_second(0),
703                    Some(Timestamp::new_second(40 * 3)),
704                )]),
705                Some(
706                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
707                ),
708            ),
709            // split 3s + 1min into 3s + 57s
710            (
711                Vec::from_iter(
712                    std::iter::once(Timestamp::new_second(0))
713                        .chain((0..40).map(|i| Timestamp::new_second(20 + i * 3))),
714                ),
715                (chrono::Duration::seconds(3), None),
716                BTreeMap::from([
717                    (Timestamp::new_second(0), Some(Timestamp::new_second(3))),
718                    (Timestamp::new_second(20), Some(Timestamp::new_second(140))),
719                ]),
720                Some(
721                    "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
722                ),
723            ),
724            // expired
725            (
726                vec![
727                    Timestamp::new_second(0),
728                    Timestamp::new_second((merge_dist as i64) * 5 * 60),
729                ],
730                (
731                    chrono::Duration::seconds(5 * 60),
732                    Some(Timestamp::new_second((merge_dist as i64) * 6 * 60)),
733                ),
734                BTreeMap::from([]),
735                None,
736            ),
737        ];
738        // let len = testcases.len();
739        // let testcases = testcases[(len - 2)..(len - 1)].to_vec();
740        for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
741            testcases
742        {
743            let mut dirty = DirtyTimeWindows::default();
744            dirty.add_lower_bounds(lower_bounds.into_iter());
745            dirty
746                .merge_dirty_time_windows(window_size, expire_lower_bound)
747                .unwrap();
748            assert_eq!(expected, dirty.windows);
749            let filter_expr = dirty
750                .gen_filter_exprs(
751                    "ts",
752                    expire_lower_bound,
753                    window_size,
754                    dirty.max_filter_num_per_query,
755                    0,
756                    None,
757                )
758                .unwrap()
759                .map(|e| e.expr);
760
761            let unparser = datafusion::sql::unparser::Unparser::default();
762            let to_sql = filter_expr
763                .as_ref()
764                .map(|e| unparser.expr_to_sql(e).unwrap().to_string());
765            assert_eq!(expected_filter_expr, to_sql.as_deref());
766        }
767    }
768
769    #[tokio::test]
770    async fn test_align_time_window() {
771        type TimeWindow = (Timestamp, Option<Timestamp>);
772        struct TestCase {
773            sql: String,
774            aligns: Vec<(TimeWindow, TimeWindow)>,
775        }
776        let testcases: Vec<TestCase> = vec![TestCase{
777            sql: "SELECT date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window;".to_string(),
778            aligns: vec![
779                ((Timestamp::new_second(3), None), (Timestamp::new_second(0), None)),
780                ((Timestamp::new_second(8), None), (Timestamp::new_second(5), None)),
781                ((Timestamp::new_second(8), Some(Timestamp::new_second(10))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
782                ((Timestamp::new_second(8), Some(Timestamp::new_second(9))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
783            ],
784        }];
785
786        let query_engine = create_test_query_engine();
787        let ctx = QueryContext::arc();
788        for TestCase { sql, aligns } in testcases {
789            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, true)
790                .await
791                .unwrap();
792
793            let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
794                &plan,
795                query_engine.engine_state().catalog_manager().clone(),
796                ctx.clone(),
797            )
798            .await
799            .unwrap();
800
801            let time_window_expr = time_window_expr
802                .map(|expr| {
803                    TimeWindowExpr::from_expr(
804                        &expr,
805                        &column_name,
806                        &df_schema,
807                        &query_engine.engine_state().session_state(),
808                    )
809                })
810                .transpose()
811                .unwrap()
812                .unwrap();
813
814            let dirty = DirtyTimeWindows::default();
815            for (before_align, expected_after_align) in aligns {
816                let after_align = dirty
817                    .align_time_window(before_align.0, before_align.1, &time_window_expr)
818                    .unwrap();
819                assert_eq!(expected_after_align, after_align);
820            }
821        }
822    }
823}