flow/batching_mode/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Batching mode task state, which changes frequently
16
17use std::collections::BTreeMap;
18use std::time::Duration;
19
20use common_telemetry::debug;
21use common_telemetry::tracing::warn;
22use common_time::Timestamp;
23use datatypes::value::Value;
24use session::context::QueryContextRef;
25use snafu::{ensure, OptionExt, ResultExt};
26use tokio::sync::oneshot;
27use tokio::time::Instant;
28
29use crate::batching_mode::task::BatchingTask;
30use crate::batching_mode::time_window::TimeWindowExpr;
31use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
32use crate::metrics::{
33    METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE,
34    METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE,
35};
36use crate::{Error, FlowId};
37
38/// The state of the [`BatchingTask`].
39#[derive(Debug)]
40pub struct TaskState {
41    /// Query context
42    pub(crate) query_ctx: QueryContextRef,
43    /// last query complete time
44    last_update_time: Instant,
45    /// last time query duration
46    last_query_duration: Duration,
47    /// Dirty Time windows need to be updated
48    /// mapping of `start -> end` and non-overlapping
49    pub(crate) dirty_time_windows: DirtyTimeWindows,
50    exec_state: ExecState,
51    /// Shutdown receiver
52    pub(crate) shutdown_rx: oneshot::Receiver<()>,
53    /// Task handle
54    pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
55}
56impl TaskState {
57    pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
58        Self {
59            query_ctx,
60            last_update_time: Instant::now(),
61            last_query_duration: Duration::from_secs(0),
62            dirty_time_windows: Default::default(),
63            exec_state: ExecState::Idle,
64            shutdown_rx,
65            task_handle: None,
66        }
67    }
68
69    /// called after last query is done
70    /// `is_succ` indicate whether the last query is successful
71    pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) {
72        self.exec_state = ExecState::Idle;
73        self.last_query_duration = elapsed;
74        self.last_update_time = Instant::now();
75    }
76
77    /// Compute the next query delay based on the time window size or the last query duration.
78    /// Aiming to avoid too frequent queries. But also not too long delay.
79    ///
80    /// next wait time is calculated as:
81    /// last query duration, capped by [max(min_run_interval, time_window_size), max_timeout],
82    /// note at most wait for `max_timeout`.
83    ///
84    /// if current the dirty time range is longer than one query can handle,
85    /// execute immediately to faster clean up dirty time windows.
86    ///
87    pub fn get_next_start_query_time(
88        &self,
89        flow_id: FlowId,
90        time_window_size: &Option<Duration>,
91        min_refresh_duration: Duration,
92        max_timeout: Option<Duration>,
93        max_filter_num_per_query: usize,
94    ) -> Instant {
95        // = last query duration, capped by [max(min_run_interval, time_window_size), max_timeout], note at most `max_timeout`
96        let lower = time_window_size.unwrap_or(min_refresh_duration);
97        let next_duration = self.last_query_duration.max(lower);
98        let next_duration = if let Some(max_timeout) = max_timeout {
99            next_duration.min(max_timeout)
100        } else {
101            next_duration
102        };
103
104        let cur_dirty_window_size = self.dirty_time_windows.window_size();
105        // compute how much time range can be handled in one query
106        let max_query_update_range = (*time_window_size)
107            .unwrap_or_default()
108            .mul_f64(max_filter_num_per_query as f64);
109        // if dirty time range is more than one query can handle, execute immediately
110        // to faster clean up dirty time windows
111        if cur_dirty_window_size < max_query_update_range {
112            self.last_update_time + next_duration
113        } else {
114            // if dirty time windows can't be clean up in one query, execute immediately to faster
115            // clean up dirty time windows
116            debug!(
117                "Flow id = {}, still have too many {} dirty time window({:?}), execute immediately",
118                flow_id,
119                self.dirty_time_windows.windows.len(),
120                self.dirty_time_windows.windows
121            );
122            Instant::now()
123        }
124    }
125}
126
127/// For keep recording of dirty time windows, which is time window that have new data inserted
128/// since last query.
129#[derive(Debug, Clone)]
130pub struct DirtyTimeWindows {
131    /// windows's `start -> end` and non-overlapping
132    /// `end` is exclusive(and optional)
133    windows: BTreeMap<Timestamp, Option<Timestamp>>,
134    /// Maximum number of filters allowed in a single query
135    max_filter_num_per_query: usize,
136    /// Time window merge distance
137    ///
138    time_window_merge_threshold: usize,
139}
140
141impl DirtyTimeWindows {
142    pub fn new(max_filter_num_per_query: usize, time_window_merge_threshold: usize) -> Self {
143        Self {
144            windows: BTreeMap::new(),
145            max_filter_num_per_query,
146            time_window_merge_threshold,
147        }
148    }
149}
150
151impl Default for DirtyTimeWindows {
152    fn default() -> Self {
153        Self {
154            windows: BTreeMap::new(),
155            max_filter_num_per_query: 20,
156            time_window_merge_threshold: 3,
157        }
158    }
159}
160
161impl DirtyTimeWindows {
162    /// Time window merge distance
163    ///
164    /// TODO(discord9): make those configurable
165    pub const MERGE_DIST: i32 = 3;
166
167    /// Add lower bounds to the dirty time windows. Upper bounds are ignored.
168    ///
169    /// # Arguments
170    ///
171    /// * `lower_bounds` - An iterator of lower bounds to be added.
172    pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
173        for lower_bound in lower_bounds {
174            let entry = self.windows.entry(lower_bound);
175            entry.or_insert(None);
176        }
177    }
178
179    pub fn window_size(&self) -> Duration {
180        let mut ret = Duration::from_secs(0);
181        for (start, end) in &self.windows {
182            if let Some(end) = end {
183                if let Some(duration) = end.sub(start) {
184                    ret += duration.to_std().unwrap_or_default();
185                }
186            }
187        }
188        ret
189    }
190
191    pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
192        self.windows.insert(start, end);
193    }
194
195    pub fn add_windows(&mut self, time_ranges: Vec<(Timestamp, Timestamp)>) {
196        for (start, end) in time_ranges {
197            self.windows.insert(start, Some(end));
198        }
199    }
200
201    /// Clean all dirty time windows, useful when can't found time window expr
202    pub fn clean(&mut self) {
203        self.windows.clear();
204    }
205
206    /// Number of dirty windows.
207    pub fn len(&self) -> usize {
208        self.windows.len()
209    }
210
211    /// Get the effective count of time windows, which is the number of time windows that can be
212    /// used for query, compute from total time window range divided by `window_size`.
213    pub fn effective_count(&self, window_size: &Duration) -> usize {
214        if self.windows.is_empty() {
215            return 0;
216        }
217        let window_size =
218            chrono::Duration::from_std(*window_size).unwrap_or(chrono::Duration::zero());
219        let total_window_time_range =
220            self.windows
221                .iter()
222                .fold(chrono::Duration::zero(), |acc, (start, end)| {
223                    if let Some(end) = end {
224                        acc + end.sub(start).unwrap_or(chrono::Duration::zero())
225                    } else {
226                        acc + window_size
227                    }
228                });
229
230        // not sure window_size is zero have any meaning, but just in case
231        if window_size.num_seconds() == 0 {
232            0
233        } else {
234            (total_window_time_range.num_seconds() / window_size.num_seconds()) as usize
235        }
236    }
237
238    /// Generate all filter expressions consuming all time windows
239    ///
240    /// there is two limits:
241    /// - shouldn't return a too long time range(<=`window_size * window_cnt`), so that the query can be executed in a reasonable time
242    /// - shouldn't return too many time range exprs, so that the query can be parsed properly instead of causing parser to overflow
243    pub fn gen_filter_exprs(
244        &mut self,
245        col_name: &str,
246        expire_lower_bound: Option<Timestamp>,
247        window_size: chrono::Duration,
248        window_cnt: usize,
249        flow_id: FlowId,
250        task_ctx: Option<&BatchingTask>,
251    ) -> Result<Option<FilterExprInfo>, Error> {
252        ensure!(
253            window_size.num_seconds() > 0,
254            UnexpectedSnafu {
255                reason: "window_size is zero, can't generate filter exprs",
256            }
257        );
258
259        debug!(
260            "expire_lower_bound: {:?}, window_size: {:?}",
261            expire_lower_bound.map(|t| t.to_iso8601_string()),
262            window_size
263        );
264        self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
265
266        if self.windows.len() > self.max_filter_num_per_query {
267            let first_time_window = self.windows.first_key_value();
268            let last_time_window = self.windows.last_key_value();
269
270            if let Some(task_ctx) = task_ctx {
271                warn!(
272                "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
273                task_ctx.config.flow_id,
274                self.windows.len(),
275                self.max_filter_num_per_query,
276                task_ctx.config.time_window_expr,
277                task_ctx.config.expire_after,
278                first_time_window,
279                last_time_window,
280                task_ctx.config.query
281            );
282            } else {
283                warn!("Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}",
284                flow_id,
285                self.windows.len(),
286                self.max_filter_num_per_query,
287                first_time_window,
288                last_time_window
289                )
290            }
291        }
292
293        // get the first `window_cnt` time windows
294        let max_time_range = window_size * window_cnt as i32;
295
296        let mut to_be_query = BTreeMap::new();
297        let mut new_windows = self.windows.clone();
298        let mut cur_time_range = chrono::Duration::zero();
299        for (idx, (start, end)) in self.windows.iter().enumerate() {
300            let first_end = start
301                .add_duration(window_size.to_std().unwrap())
302                .context(TimeSnafu)?;
303            let end = end.unwrap_or(first_end);
304
305            // if time range is too long, stop
306            if cur_time_range >= max_time_range {
307                break;
308            }
309
310            // if we have enough time windows, stop
311            if idx >= window_cnt {
312                break;
313            }
314
315            let Some(x) = end.sub(start) else {
316                continue;
317            };
318            if cur_time_range + x <= max_time_range {
319                to_be_query.insert(*start, Some(end));
320                new_windows.remove(start);
321                cur_time_range += x;
322            } else {
323                // too large a window, split it
324                // split at window_size * times
325                let surplus = max_time_range - cur_time_range;
326                if surplus.num_seconds() <= window_size.num_seconds() {
327                    // Skip splitting if surplus is smaller than window_size
328                    break;
329                }
330                let times = surplus.num_seconds() / window_size.num_seconds();
331
332                let split_offset = window_size * times as i32;
333                let split_at = start
334                    .add_duration(split_offset.to_std().unwrap())
335                    .context(TimeSnafu)?;
336                to_be_query.insert(*start, Some(split_at));
337
338                // remove the original window
339                new_windows.remove(start);
340                new_windows.insert(split_at, Some(end));
341                cur_time_range += split_offset;
342                break;
343            }
344        }
345
346        self.windows = new_windows;
347
348        METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
349            .with_label_values(&[flow_id.to_string().as_str()])
350            .observe(to_be_query.len() as f64);
351
352        let full_time_range = to_be_query
353            .iter()
354            .fold(chrono::Duration::zero(), |acc, (start, end)| {
355                if let Some(end) = end {
356                    acc + end.sub(start).unwrap_or(chrono::Duration::zero())
357                } else {
358                    acc + window_size
359                }
360            })
361            .num_seconds() as f64;
362        METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE
363            .with_label_values(&[flow_id.to_string().as_str()])
364            .observe(full_time_range);
365
366        let stalled_time_range =
367            self.windows
368                .iter()
369                .fold(chrono::Duration::zero(), |acc, (start, end)| {
370                    if let Some(end) = end {
371                        acc + end.sub(start).unwrap_or(chrono::Duration::zero())
372                    } else {
373                        acc + window_size
374                    }
375                });
376
377        METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE
378            .with_label_values(&[flow_id.to_string().as_str()])
379            .observe(stalled_time_range.num_seconds() as f64);
380
381        let std_window_size = window_size.to_std().map_err(|e| {
382            InternalSnafu {
383                reason: e.to_string(),
384            }
385            .build()
386        })?;
387
388        let mut expr_lst = vec![];
389        let mut time_ranges = vec![];
390        for (start, end) in to_be_query.into_iter() {
391            // align using time window exprs
392            let (start, end) = if let Some(ctx) = task_ctx {
393                let Some(time_window_expr) = &ctx.config.time_window_expr else {
394                    UnexpectedSnafu {
395                        reason: "time_window_expr is not set",
396                    }
397                    .fail()?
398                };
399                self.align_time_window(start, end, time_window_expr)?
400            } else {
401                (start, end)
402            };
403            let end = end.unwrap_or(start.add_duration(std_window_size).context(TimeSnafu)?);
404            time_ranges.push((start, end));
405
406            debug!(
407                "Time window start: {:?}, end: {:?}",
408                start.to_iso8601_string(),
409                end.to_iso8601_string()
410            );
411
412            use datafusion_expr::{col, lit};
413            let lower = to_df_literal(start)?;
414            let upper = to_df_literal(end)?;
415            let expr = col(col_name)
416                .gt_eq(lit(lower))
417                .and(col(col_name).lt(lit(upper)));
418            expr_lst.push(expr);
419        }
420        let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
421        let ret = expr.map(|expr| FilterExprInfo {
422            expr,
423            col_name: col_name.to_string(),
424            time_ranges,
425            window_size,
426        });
427        Ok(ret)
428    }
429
430    fn align_time_window(
431        &self,
432        start: Timestamp,
433        end: Option<Timestamp>,
434        time_window_expr: &TimeWindowExpr,
435    ) -> Result<(Timestamp, Option<Timestamp>), Error> {
436        let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
437            reason: format!(
438                "Failed to align start time {:?} with time window expr {:?}",
439                start, time_window_expr
440            ),
441        })?;
442        let align_end = end
443            .and_then(|end| {
444                time_window_expr
445                    .eval(end)
446                    // if after aligned, end is the same, then use end(because it's already aligned) else use aligned end
447                    .map(|r| if r.0 == Some(end) { r.0 } else { r.1 })
448                    .transpose()
449            })
450            .transpose()?;
451        Ok((align_start, align_end))
452    }
453
454    /// Merge time windows that overlaps or get too close
455    ///
456    /// TODO(discord9): not merge and prefer to send smaller time windows? how?
457    pub fn merge_dirty_time_windows(
458        &mut self,
459        window_size: chrono::Duration,
460        expire_lower_bound: Option<Timestamp>,
461    ) -> Result<(), Error> {
462        if self.windows.is_empty() {
463            return Ok(());
464        }
465
466        let mut new_windows = BTreeMap::new();
467
468        let std_window_size = window_size.to_std().map_err(|e| {
469            InternalSnafu {
470                reason: e.to_string(),
471            }
472            .build()
473        })?;
474
475        // previous time window
476        let mut prev_tw = None;
477        for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
478            // filter out expired time window
479            if let Some(expire_lower_bound) = expire_lower_bound {
480                if lower_bound < expire_lower_bound {
481                    continue;
482                }
483            }
484
485            let Some(prev_tw) = &mut prev_tw else {
486                prev_tw = Some((lower_bound, upper_bound));
487                continue;
488            };
489
490            // if cur.lower - prev.upper <= window_size * MERGE_DIST, merge
491            // this also deal with overlap windows because cur.lower > prev.lower is always true
492            let prev_upper = prev_tw
493                .1
494                .unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
495            prev_tw.1 = Some(prev_upper);
496
497            let cur_upper = upper_bound.unwrap_or(
498                lower_bound
499                    .add_duration(std_window_size)
500                    .context(TimeSnafu)?,
501            );
502
503            if lower_bound
504                .sub(&prev_upper)
505                .map(|dist| dist <= window_size * self.time_window_merge_threshold as i32)
506                .unwrap_or(false)
507            {
508                prev_tw.1 = Some(cur_upper);
509            } else {
510                new_windows.insert(prev_tw.0, prev_tw.1);
511                *prev_tw = (lower_bound, Some(cur_upper));
512            }
513        }
514
515        if let Some(prev_tw) = prev_tw {
516            new_windows.insert(prev_tw.0, prev_tw.1);
517        }
518
519        self.windows = new_windows;
520
521        Ok(())
522    }
523}
524
525fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
526    let value = Value::from(value);
527    let value = value
528        .try_to_scalar_value(&value.data_type())
529        .with_context(|_| DatatypesSnafu {
530            extra: format!("Failed to convert to scalar value: {}", value),
531        })?;
532    Ok(value)
533}
534
535#[derive(Debug, Clone)]
536enum ExecState {
537    Idle,
538    Executing,
539}
540
541/// Filter Expression's information
542#[derive(Debug, Clone)]
543pub struct FilterExprInfo {
544    pub expr: datafusion_expr::Expr,
545    pub col_name: String,
546    pub time_ranges: Vec<(Timestamp, Timestamp)>,
547    pub window_size: chrono::Duration,
548}
549
550impl FilterExprInfo {
551    pub fn total_window_length(&self) -> chrono::Duration {
552        self.time_ranges
553            .iter()
554            .fold(chrono::Duration::zero(), |acc, (start, end)| {
555                acc + end.sub(start).unwrap_or(chrono::Duration::zero())
556            })
557    }
558}
559
560#[cfg(test)]
561mod test {
562    use pretty_assertions::assert_eq;
563    use session::context::QueryContext;
564
565    use super::*;
566    use crate::batching_mode::time_window::find_time_window_expr;
567    use crate::batching_mode::utils::sql_to_df_plan;
568    use crate::test_utils::create_test_query_engine;
569
570    #[test]
571    fn test_merge_dirty_time_windows() {
572        let merge_dist = DirtyTimeWindows::default().time_window_merge_threshold;
573        let testcases = vec![
574            // just enough to merge
575            (
576                vec![
577                    Timestamp::new_second(0),
578                    Timestamp::new_second((1 + merge_dist as i64) * 5 * 60),
579                ],
580                (chrono::Duration::seconds(5 * 60), None),
581                BTreeMap::from([(
582                    Timestamp::new_second(0),
583                    Some(Timestamp::new_second(
584                        (2 + merge_dist as i64) * 5 * 60,
585                    )),
586                )]),
587                Some(
588                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:25:00' AS TIMESTAMP)))",
589                )
590            ),
591            // separate time window
592            (
593                vec![
594                    Timestamp::new_second(0),
595                    Timestamp::new_second((2 + merge_dist as i64) * 5 * 60),
596                ],
597                (chrono::Duration::seconds(5 * 60), None),
598                BTreeMap::from([
599                    (
600                        Timestamp::new_second(0),
601                        Some(Timestamp::new_second(5 * 60)),
602                    ),
603                    (
604                        Timestamp::new_second((2 + merge_dist as i64) * 5 * 60),
605                        Some(Timestamp::new_second(
606                            (3 + merge_dist as i64) * 5 * 60,
607                        )),
608                    ),
609                ]),
610                Some(
611                    "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:05:00' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:25:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:30:00' AS TIMESTAMP))))",
612                )
613            ),
614            // overlapping
615            (
616                vec![
617                    Timestamp::new_second(0),
618                    Timestamp::new_second((merge_dist as i64) * 5 * 60),
619                ],
620                (chrono::Duration::seconds(5 * 60), None),
621                BTreeMap::from([(
622                    Timestamp::new_second(0),
623                    Some(Timestamp::new_second(
624                        (1 + merge_dist as i64) * 5 * 60,
625                    )),
626                )]),
627                Some(
628                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:20:00' AS TIMESTAMP)))",
629                )
630            ),
631            // complex overlapping
632            (
633                vec![
634                    Timestamp::new_second(0),
635                    Timestamp::new_second((merge_dist as i64) * 3),
636                    Timestamp::new_second((merge_dist as i64) * 3 * 2),
637                ],
638                (chrono::Duration::seconds(3), None),
639                BTreeMap::from([(
640                    Timestamp::new_second(0),
641                    Some(Timestamp::new_second(
642                        (merge_dist as i64) * 7
643                    )),
644                )]),
645                Some(
646                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
647                )
648            ),
649            // split range
650            (
651                Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once(
652                    Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
653                ))),
654                (chrono::Duration::seconds(3), None),
655                BTreeMap::from([
656                (
657                    Timestamp::new_second(0),
658                    Some(Timestamp::new_second(
659                        60
660                    )),
661                ),
662                (
663                    Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
664                    Some(Timestamp::new_second(
665                        60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3
666                    )),
667                )]),
668                Some(
669                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
670                )
671            ),
672            // split 2 min into 1 min
673            (
674                Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))),
675                (chrono::Duration::seconds(3), None),
676                BTreeMap::from([
677                (
678                    Timestamp::new_second(0),
679                    Some(Timestamp::new_second(
680                        40 * 3
681                    )),
682                )]),
683                Some(
684                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
685                )
686            ),
687            // split 3s + 1min into 3s + 57s
688            (
689                Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))),
690                (chrono::Duration::seconds(3), None),
691                BTreeMap::from([
692                (
693                    Timestamp::new_second(0),
694                    Some(Timestamp::new_second(
695                        3
696                    )),
697                ),(
698                    Timestamp::new_second(20),
699                    Some(Timestamp::new_second(
700                        140
701                    )),
702                )]),
703                Some(
704                    "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
705                )
706            ),
707            // expired
708            (
709                vec![
710                    Timestamp::new_second(0),
711                    Timestamp::new_second((merge_dist as i64) * 5 * 60),
712                ],
713                (
714                    chrono::Duration::seconds(5 * 60),
715                    Some(Timestamp::new_second(
716                        (merge_dist as i64) * 6 * 60,
717                    )),
718                ),
719                BTreeMap::from([]),
720                None
721            ),
722        ];
723        // let len = testcases.len();
724        // let testcases = testcases[(len - 2)..(len - 1)].to_vec();
725        for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
726            testcases
727        {
728            let mut dirty = DirtyTimeWindows::default();
729            dirty.add_lower_bounds(lower_bounds.into_iter());
730            dirty
731                .merge_dirty_time_windows(window_size, expire_lower_bound)
732                .unwrap();
733            assert_eq!(expected, dirty.windows);
734            let filter_expr = dirty
735                .gen_filter_exprs(
736                    "ts",
737                    expire_lower_bound,
738                    window_size,
739                    dirty.max_filter_num_per_query,
740                    0,
741                    None,
742                )
743                .unwrap()
744                .map(|e| e.expr);
745
746            let unparser = datafusion::sql::unparser::Unparser::default();
747            let to_sql = filter_expr
748                .as_ref()
749                .map(|e| unparser.expr_to_sql(e).unwrap().to_string());
750            assert_eq!(expected_filter_expr, to_sql.as_deref());
751        }
752    }
753
754    #[tokio::test]
755    async fn test_align_time_window() {
756        type TimeWindow = (Timestamp, Option<Timestamp>);
757        struct TestCase {
758            sql: String,
759            aligns: Vec<(TimeWindow, TimeWindow)>,
760        }
761        let testcases: Vec<TestCase> = vec![TestCase{
762            sql: "SELECT date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window;".to_string(),
763            aligns: vec![
764                ((Timestamp::new_second(3), None), (Timestamp::new_second(0), None)),
765                ((Timestamp::new_second(8), None), (Timestamp::new_second(5), None)),
766                ((Timestamp::new_second(8), Some(Timestamp::new_second(10))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
767                ((Timestamp::new_second(8), Some(Timestamp::new_second(9))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
768            ],
769        }];
770
771        let query_engine = create_test_query_engine();
772        let ctx = QueryContext::arc();
773        for TestCase { sql, aligns } in testcases {
774            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, true)
775                .await
776                .unwrap();
777
778            let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
779                &plan,
780                query_engine.engine_state().catalog_manager().clone(),
781                ctx.clone(),
782            )
783            .await
784            .unwrap();
785
786            let time_window_expr = time_window_expr
787                .map(|expr| {
788                    TimeWindowExpr::from_expr(
789                        &expr,
790                        &column_name,
791                        &df_schema,
792                        &query_engine.engine_state().session_state(),
793                    )
794                })
795                .transpose()
796                .unwrap()
797                .unwrap();
798
799            let dirty = DirtyTimeWindows::default();
800            for (before_align, expected_after_align) in aligns {
801                let after_align = dirty
802                    .align_time_window(before_align.0, before_align.1, &time_window_expr)
803                    .unwrap();
804                assert_eq!(expected_after_align, after_align);
805            }
806        }
807    }
808}