flow/batching_mode/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Batching mode task state, which changes frequently
16
17use std::collections::BTreeMap;
18use std::time::Duration;
19
20use common_telemetry::debug;
21use common_telemetry::tracing::warn;
22use common_time::Timestamp;
23use datatypes::value::Value;
24use session::context::QueryContextRef;
25use snafu::{OptionExt, ResultExt, ensure};
26use tokio::sync::oneshot;
27use tokio::time::Instant;
28
29use crate::batching_mode::task::BatchingTask;
30use crate::batching_mode::time_window::TimeWindowExpr;
31use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
32use crate::metrics::{
33    METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE,
34    METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE,
35};
36use crate::{Error, FlowId};
37
38/// The state of the [`BatchingTask`].
39#[derive(Debug)]
40pub struct TaskState {
41    /// Query context
42    pub(crate) query_ctx: QueryContextRef,
43    /// last query complete time
44    last_update_time: Instant,
45    /// last time query duration
46    last_query_duration: Duration,
47    /// Dirty Time windows need to be updated
48    /// mapping of `start -> end` and non-overlapping
49    pub(crate) dirty_time_windows: DirtyTimeWindows,
50    exec_state: ExecState,
51    /// Shutdown receiver
52    pub(crate) shutdown_rx: oneshot::Receiver<()>,
53    /// Task handle
54    pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
55}
56impl TaskState {
57    pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
58        Self {
59            query_ctx,
60            last_update_time: Instant::now(),
61            last_query_duration: Duration::from_secs(0),
62            dirty_time_windows: Default::default(),
63            exec_state: ExecState::Idle,
64            shutdown_rx,
65            task_handle: None,
66        }
67    }
68
69    /// called after last query is done
70    /// `is_succ` indicate whether the last query is successful
71    pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) {
72        self.exec_state = ExecState::Idle;
73        self.last_query_duration = elapsed;
74        self.last_update_time = Instant::now();
75    }
76
77    /// Compute the next query delay based on the time window size or the last query duration.
78    /// Aiming to avoid too frequent queries. But also not too long delay.
79    ///
80    /// next wait time is calculated as:
81    /// last query duration, capped by [max(min_run_interval, time_window_size), max_timeout],
82    /// note at most wait for `max_timeout`.
83    ///
84    /// if current the dirty time range is longer than one query can handle,
85    /// execute immediately to faster clean up dirty time windows.
86    ///
87    pub fn get_next_start_query_time(
88        &self,
89        flow_id: FlowId,
90        time_window_size: &Option<Duration>,
91        min_refresh_duration: Duration,
92        max_timeout: Option<Duration>,
93        max_filter_num_per_query: usize,
94    ) -> Instant {
95        // = last query duration, capped by [max(min_run_interval, time_window_size), max_timeout], note at most `max_timeout`
96        let lower = time_window_size.unwrap_or(min_refresh_duration);
97        let next_duration = self.last_query_duration.max(lower);
98        let next_duration = if let Some(max_timeout) = max_timeout {
99            next_duration.min(max_timeout)
100        } else {
101            next_duration
102        };
103
104        let cur_dirty_window_size = self.dirty_time_windows.window_size();
105        // compute how much time range can be handled in one query
106        let max_query_update_range = (*time_window_size)
107            .unwrap_or_default()
108            .mul_f64(max_filter_num_per_query as f64);
109        // if dirty time range is more than one query can handle, execute immediately
110        // to faster clean up dirty time windows
111        if cur_dirty_window_size < max_query_update_range {
112            self.last_update_time + next_duration
113        } else {
114            // if dirty time windows can't be clean up in one query, execute immediately to faster
115            // clean up dirty time windows
116            debug!(
117                "Flow id = {}, still have too many {} dirty time window({:?}), execute immediately",
118                flow_id,
119                self.dirty_time_windows.windows.len(),
120                self.dirty_time_windows.windows
121            );
122            Instant::now()
123        }
124    }
125}
126
127/// For keep recording of dirty time windows, which is time window that have new data inserted
128/// since last query.
129#[derive(Debug, Clone)]
130pub struct DirtyTimeWindows {
131    /// windows's `start -> end` and non-overlapping
132    /// `end` is exclusive(and optional)
133    windows: BTreeMap<Timestamp, Option<Timestamp>>,
134    /// Maximum number of filters allowed in a single query
135    max_filter_num_per_query: usize,
136    /// Time window merge distance
137    ///
138    time_window_merge_threshold: usize,
139}
140
141impl DirtyTimeWindows {
142    pub fn new(max_filter_num_per_query: usize, time_window_merge_threshold: usize) -> Self {
143        Self {
144            windows: BTreeMap::new(),
145            max_filter_num_per_query,
146            time_window_merge_threshold,
147        }
148    }
149}
150
151impl Default for DirtyTimeWindows {
152    fn default() -> Self {
153        Self {
154            windows: BTreeMap::new(),
155            max_filter_num_per_query: 20,
156            time_window_merge_threshold: 3,
157        }
158    }
159}
160
161impl DirtyTimeWindows {
162    /// Time window merge distance
163    ///
164    /// TODO(discord9): make those configurable
165    pub const MERGE_DIST: i32 = 3;
166
167    /// Add lower bounds to the dirty time windows. Upper bounds are ignored.
168    ///
169    /// # Arguments
170    ///
171    /// * `lower_bounds` - An iterator of lower bounds to be added.
172    pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
173        for lower_bound in lower_bounds {
174            let entry = self.windows.entry(lower_bound);
175            entry.or_insert(None);
176        }
177    }
178
179    pub fn window_size(&self) -> Duration {
180        let mut ret = Duration::from_secs(0);
181        for (start, end) in &self.windows {
182            if let Some(end) = end
183                && let Some(duration) = end.sub(start)
184            {
185                ret += duration.to_std().unwrap_or_default();
186            }
187        }
188        ret
189    }
190
191    pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
192        self.windows.insert(start, end);
193    }
194
195    pub fn add_windows(&mut self, time_ranges: Vec<(Timestamp, Timestamp)>) {
196        for (start, end) in time_ranges {
197            self.windows.insert(start, Some(end));
198        }
199    }
200
201    /// Clean all dirty time windows, useful when can't found time window expr
202    pub fn clean(&mut self) {
203        self.windows.clear();
204    }
205
206    /// Set windows to be dirty, only useful for full aggr without time window
207    /// to mark some new data is inserted
208    pub fn set_dirty(&mut self) {
209        self.windows.insert(Timestamp::new_second(0), None);
210    }
211
212    /// Number of dirty windows.
213    pub fn len(&self) -> usize {
214        self.windows.len()
215    }
216
217    pub fn is_empty(&self) -> bool {
218        self.windows.is_empty()
219    }
220
221    /// Get the effective count of time windows, which is the number of time windows that can be
222    /// used for query, compute from total time window range divided by `window_size`.
223    pub fn effective_count(&self, window_size: &Duration) -> usize {
224        if self.windows.is_empty() {
225            return 0;
226        }
227        let window_size =
228            chrono::Duration::from_std(*window_size).unwrap_or(chrono::Duration::zero());
229        let total_window_time_range =
230            self.windows
231                .iter()
232                .fold(chrono::Duration::zero(), |acc, (start, end)| {
233                    if let Some(end) = end {
234                        acc + end.sub(start).unwrap_or(chrono::Duration::zero())
235                    } else {
236                        acc + window_size
237                    }
238                });
239
240        // not sure window_size is zero have any meaning, but just in case
241        if window_size.num_seconds() == 0 {
242            0
243        } else {
244            (total_window_time_range.num_seconds() / window_size.num_seconds()) as usize
245        }
246    }
247
248    /// Generate all filter expressions consuming all time windows
249    ///
250    /// there is two limits:
251    /// - shouldn't return a too long time range(<=`window_size * window_cnt`), so that the query can be executed in a reasonable time
252    /// - shouldn't return too many time range exprs, so that the query can be parsed properly instead of causing parser to overflow
253    pub fn gen_filter_exprs(
254        &mut self,
255        col_name: &str,
256        expire_lower_bound: Option<Timestamp>,
257        window_size: chrono::Duration,
258        window_cnt: usize,
259        flow_id: FlowId,
260        task_ctx: Option<&BatchingTask>,
261    ) -> Result<Option<FilterExprInfo>, Error> {
262        ensure!(
263            window_size.num_seconds() > 0,
264            UnexpectedSnafu {
265                reason: "window_size is zero, can't generate filter exprs",
266            }
267        );
268
269        debug!(
270            "expire_lower_bound: {:?}, window_size: {:?}",
271            expire_lower_bound.map(|t| t.to_iso8601_string()),
272            window_size
273        );
274        self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
275
276        if self.windows.len() > self.max_filter_num_per_query {
277            let first_time_window = self.windows.first_key_value();
278            let last_time_window = self.windows.last_key_value();
279
280            if let Some(task_ctx) = task_ctx {
281                warn!(
282                    "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
283                    task_ctx.config.flow_id,
284                    self.windows.len(),
285                    self.max_filter_num_per_query,
286                    task_ctx.config.time_window_expr,
287                    task_ctx.config.expire_after,
288                    first_time_window,
289                    last_time_window,
290                    task_ctx.config.query
291                );
292            } else {
293                warn!(
294                    "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}",
295                    flow_id,
296                    self.windows.len(),
297                    self.max_filter_num_per_query,
298                    first_time_window,
299                    last_time_window
300                )
301            }
302        }
303
304        // get the first `window_cnt` time windows
305        let max_time_range = window_size * window_cnt as i32;
306
307        let mut to_be_query = BTreeMap::new();
308        let mut new_windows = self.windows.clone();
309        let mut cur_time_range = chrono::Duration::zero();
310        for (idx, (start, end)) in self.windows.iter().enumerate() {
311            let first_end = start
312                .add_duration(window_size.to_std().unwrap())
313                .context(TimeSnafu)?;
314            let end = end.unwrap_or(first_end);
315
316            // if time range is too long, stop
317            if cur_time_range >= max_time_range {
318                break;
319            }
320
321            // if we have enough time windows, stop
322            if idx >= window_cnt {
323                break;
324            }
325
326            let Some(x) = end.sub(start) else {
327                continue;
328            };
329            if cur_time_range + x <= max_time_range {
330                to_be_query.insert(*start, Some(end));
331                new_windows.remove(start);
332                cur_time_range += x;
333            } else {
334                // too large a window, split it
335                // split at window_size * times
336                let surplus = max_time_range - cur_time_range;
337                if surplus.num_seconds() <= window_size.num_seconds() {
338                    // Skip splitting if surplus is smaller than window_size
339                    break;
340                }
341                let times = surplus.num_seconds() / window_size.num_seconds();
342
343                let split_offset = window_size * times as i32;
344                let split_at = start
345                    .add_duration(split_offset.to_std().unwrap())
346                    .context(TimeSnafu)?;
347                to_be_query.insert(*start, Some(split_at));
348
349                // remove the original window
350                new_windows.remove(start);
351                new_windows.insert(split_at, Some(end));
352                cur_time_range += split_offset;
353                break;
354            }
355        }
356
357        self.windows = new_windows;
358
359        METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
360            .with_label_values(&[flow_id.to_string().as_str()])
361            .observe(to_be_query.len() as f64);
362
363        let full_time_range = to_be_query
364            .iter()
365            .fold(chrono::Duration::zero(), |acc, (start, end)| {
366                if let Some(end) = end {
367                    acc + end.sub(start).unwrap_or(chrono::Duration::zero())
368                } else {
369                    acc + window_size
370                }
371            })
372            .num_seconds() as f64;
373        METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE
374            .with_label_values(&[flow_id.to_string().as_str()])
375            .observe(full_time_range);
376
377        let stalled_time_range =
378            self.windows
379                .iter()
380                .fold(chrono::Duration::zero(), |acc, (start, end)| {
381                    if let Some(end) = end {
382                        acc + end.sub(start).unwrap_or(chrono::Duration::zero())
383                    } else {
384                        acc + window_size
385                    }
386                });
387
388        METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE
389            .with_label_values(&[flow_id.to_string().as_str()])
390            .observe(stalled_time_range.num_seconds() as f64);
391
392        let std_window_size = window_size.to_std().map_err(|e| {
393            InternalSnafu {
394                reason: e.to_string(),
395            }
396            .build()
397        })?;
398
399        let mut expr_lst = vec![];
400        let mut time_ranges = vec![];
401        for (start, end) in to_be_query.into_iter() {
402            // align using time window exprs
403            let (start, end) = if let Some(ctx) = task_ctx {
404                let Some(time_window_expr) = &ctx.config.time_window_expr else {
405                    UnexpectedSnafu {
406                        reason: "time_window_expr is not set",
407                    }
408                    .fail()?
409                };
410                self.align_time_window(start, end, time_window_expr)?
411            } else {
412                (start, end)
413            };
414            let end = end.unwrap_or(start.add_duration(std_window_size).context(TimeSnafu)?);
415            time_ranges.push((start, end));
416
417            debug!(
418                "Time window start: {:?}, end: {:?}",
419                start.to_iso8601_string(),
420                end.to_iso8601_string()
421            );
422
423            use datafusion_expr::{col, lit};
424            let lower = to_df_literal(start)?;
425            let upper = to_df_literal(end)?;
426            let expr = col(col_name)
427                .gt_eq(lit(lower))
428                .and(col(col_name).lt(lit(upper)));
429            expr_lst.push(expr);
430        }
431        let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
432        let ret = expr.map(|expr| FilterExprInfo {
433            expr,
434            col_name: col_name.to_string(),
435            time_ranges,
436            window_size,
437        });
438        Ok(ret)
439    }
440
441    fn align_time_window(
442        &self,
443        start: Timestamp,
444        end: Option<Timestamp>,
445        time_window_expr: &TimeWindowExpr,
446    ) -> Result<(Timestamp, Option<Timestamp>), Error> {
447        let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
448            reason: format!(
449                "Failed to align start time {:?} with time window expr {:?}",
450                start, time_window_expr
451            ),
452        })?;
453        let align_end = end
454            .and_then(|end| {
455                time_window_expr
456                    .eval(end)
457                    // if after aligned, end is the same, then use end(because it's already aligned) else use aligned end
458                    .map(|r| if r.0 == Some(end) { r.0 } else { r.1 })
459                    .transpose()
460            })
461            .transpose()?;
462        Ok((align_start, align_end))
463    }
464
465    /// Merge time windows that overlaps or get too close
466    ///
467    /// TODO(discord9): not merge and prefer to send smaller time windows? how?
468    pub fn merge_dirty_time_windows(
469        &mut self,
470        window_size: chrono::Duration,
471        expire_lower_bound: Option<Timestamp>,
472    ) -> Result<(), Error> {
473        if self.windows.is_empty() {
474            return Ok(());
475        }
476
477        let mut new_windows = BTreeMap::new();
478
479        let std_window_size = window_size.to_std().map_err(|e| {
480            InternalSnafu {
481                reason: e.to_string(),
482            }
483            .build()
484        })?;
485
486        // previous time window
487        let mut prev_tw = None;
488        for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
489            // filter out expired time window
490            if let Some(expire_lower_bound) = expire_lower_bound
491                && lower_bound < expire_lower_bound
492            {
493                continue;
494            }
495
496            let Some(prev_tw) = &mut prev_tw else {
497                prev_tw = Some((lower_bound, upper_bound));
498                continue;
499            };
500
501            // if cur.lower - prev.upper <= window_size * MERGE_DIST, merge
502            // this also deal with overlap windows because cur.lower > prev.lower is always true
503            let prev_upper = prev_tw
504                .1
505                .unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
506            prev_tw.1 = Some(prev_upper);
507
508            let cur_upper = upper_bound.unwrap_or(
509                lower_bound
510                    .add_duration(std_window_size)
511                    .context(TimeSnafu)?,
512            );
513
514            if lower_bound
515                .sub(&prev_upper)
516                .map(|dist| dist <= window_size * self.time_window_merge_threshold as i32)
517                .unwrap_or(false)
518            {
519                prev_tw.1 = Some(cur_upper);
520            } else {
521                new_windows.insert(prev_tw.0, prev_tw.1);
522                *prev_tw = (lower_bound, Some(cur_upper));
523            }
524        }
525
526        if let Some(prev_tw) = prev_tw {
527            new_windows.insert(prev_tw.0, prev_tw.1);
528        }
529
530        self.windows = new_windows;
531
532        Ok(())
533    }
534}
535
536fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
537    let value = Value::from(value);
538    let value = value
539        .try_to_scalar_value(&value.data_type())
540        .with_context(|_| DatatypesSnafu {
541            extra: format!("Failed to convert to scalar value: {}", value),
542        })?;
543    Ok(value)
544}
545
546#[derive(Debug, Clone)]
547enum ExecState {
548    Idle,
549    Executing,
550}
551
552/// Filter Expression's information
553#[derive(Debug, Clone)]
554pub struct FilterExprInfo {
555    pub expr: datafusion_expr::Expr,
556    pub col_name: String,
557    pub time_ranges: Vec<(Timestamp, Timestamp)>,
558    pub window_size: chrono::Duration,
559}
560
561impl FilterExprInfo {
562    pub fn total_window_length(&self) -> chrono::Duration {
563        self.time_ranges
564            .iter()
565            .fold(chrono::Duration::zero(), |acc, (start, end)| {
566                acc + end.sub(start).unwrap_or(chrono::Duration::zero())
567            })
568    }
569}
570
571#[cfg(test)]
572mod test {
573    use pretty_assertions::assert_eq;
574    use session::context::QueryContext;
575
576    use super::*;
577    use crate::batching_mode::time_window::find_time_window_expr;
578    use crate::batching_mode::utils::sql_to_df_plan;
579    use crate::test_utils::create_test_query_engine;
580
581    #[test]
582    fn test_merge_dirty_time_windows() {
583        let merge_dist = DirtyTimeWindows::default().time_window_merge_threshold;
584        let testcases = vec![
585            // just enough to merge
586            (
587                vec![
588                    Timestamp::new_second(0),
589                    Timestamp::new_second((1 + merge_dist as i64) * 5 * 60),
590                ],
591                (chrono::Duration::seconds(5 * 60), None),
592                BTreeMap::from([(
593                    Timestamp::new_second(0),
594                    Some(Timestamp::new_second((2 + merge_dist as i64) * 5 * 60)),
595                )]),
596                Some(
597                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:25:00' AS TIMESTAMP)))",
598                ),
599            ),
600            // separate time window
601            (
602                vec![
603                    Timestamp::new_second(0),
604                    Timestamp::new_second((2 + merge_dist as i64) * 5 * 60),
605                ],
606                (chrono::Duration::seconds(5 * 60), None),
607                BTreeMap::from([
608                    (
609                        Timestamp::new_second(0),
610                        Some(Timestamp::new_second(5 * 60)),
611                    ),
612                    (
613                        Timestamp::new_second((2 + merge_dist as i64) * 5 * 60),
614                        Some(Timestamp::new_second((3 + merge_dist as i64) * 5 * 60)),
615                    ),
616                ]),
617                Some(
618                    "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:05:00' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:25:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:30:00' AS TIMESTAMP))))",
619                ),
620            ),
621            // overlapping
622            (
623                vec![
624                    Timestamp::new_second(0),
625                    Timestamp::new_second((merge_dist as i64) * 5 * 60),
626                ],
627                (chrono::Duration::seconds(5 * 60), None),
628                BTreeMap::from([(
629                    Timestamp::new_second(0),
630                    Some(Timestamp::new_second((1 + merge_dist as i64) * 5 * 60)),
631                )]),
632                Some(
633                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:20:00' AS TIMESTAMP)))",
634                ),
635            ),
636            // complex overlapping
637            (
638                vec![
639                    Timestamp::new_second(0),
640                    Timestamp::new_second((merge_dist as i64) * 3),
641                    Timestamp::new_second((merge_dist as i64) * 3 * 2),
642                ],
643                (chrono::Duration::seconds(3), None),
644                BTreeMap::from([(
645                    Timestamp::new_second(0),
646                    Some(Timestamp::new_second((merge_dist as i64) * 7)),
647                )]),
648                Some(
649                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
650                ),
651            ),
652            // split range
653            (
654                Vec::from_iter((0..20).map(|i| Timestamp::new_second(i * 3)).chain(
655                    std::iter::once(Timestamp::new_second(
656                        60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1),
657                    )),
658                )),
659                (chrono::Duration::seconds(3), None),
660                BTreeMap::from([
661                    (Timestamp::new_second(0), Some(Timestamp::new_second(60))),
662                    (
663                        Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
664                        Some(Timestamp::new_second(
665                            60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3,
666                        )),
667                    ),
668                ]),
669                Some(
670                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
671                ),
672            ),
673            // split 2 min into 1 min
674            (
675                Vec::from_iter((0..40).map(|i| Timestamp::new_second(i * 3))),
676                (chrono::Duration::seconds(3), None),
677                BTreeMap::from([(
678                    Timestamp::new_second(0),
679                    Some(Timestamp::new_second(40 * 3)),
680                )]),
681                Some(
682                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
683                ),
684            ),
685            // split 3s + 1min into 3s + 57s
686            (
687                Vec::from_iter(
688                    std::iter::once(Timestamp::new_second(0))
689                        .chain((0..40).map(|i| Timestamp::new_second(20 + i * 3))),
690                ),
691                (chrono::Duration::seconds(3), None),
692                BTreeMap::from([
693                    (Timestamp::new_second(0), Some(Timestamp::new_second(3))),
694                    (Timestamp::new_second(20), Some(Timestamp::new_second(140))),
695                ]),
696                Some(
697                    "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
698                ),
699            ),
700            // expired
701            (
702                vec![
703                    Timestamp::new_second(0),
704                    Timestamp::new_second((merge_dist as i64) * 5 * 60),
705                ],
706                (
707                    chrono::Duration::seconds(5 * 60),
708                    Some(Timestamp::new_second((merge_dist as i64) * 6 * 60)),
709                ),
710                BTreeMap::from([]),
711                None,
712            ),
713        ];
714        // let len = testcases.len();
715        // let testcases = testcases[(len - 2)..(len - 1)].to_vec();
716        for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
717            testcases
718        {
719            let mut dirty = DirtyTimeWindows::default();
720            dirty.add_lower_bounds(lower_bounds.into_iter());
721            dirty
722                .merge_dirty_time_windows(window_size, expire_lower_bound)
723                .unwrap();
724            assert_eq!(expected, dirty.windows);
725            let filter_expr = dirty
726                .gen_filter_exprs(
727                    "ts",
728                    expire_lower_bound,
729                    window_size,
730                    dirty.max_filter_num_per_query,
731                    0,
732                    None,
733                )
734                .unwrap()
735                .map(|e| e.expr);
736
737            let unparser = datafusion::sql::unparser::Unparser::default();
738            let to_sql = filter_expr
739                .as_ref()
740                .map(|e| unparser.expr_to_sql(e).unwrap().to_string());
741            assert_eq!(expected_filter_expr, to_sql.as_deref());
742        }
743    }
744
745    #[tokio::test]
746    async fn test_align_time_window() {
747        type TimeWindow = (Timestamp, Option<Timestamp>);
748        struct TestCase {
749            sql: String,
750            aligns: Vec<(TimeWindow, TimeWindow)>,
751        }
752        let testcases: Vec<TestCase> = vec![TestCase{
753            sql: "SELECT date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window;".to_string(),
754            aligns: vec![
755                ((Timestamp::new_second(3), None), (Timestamp::new_second(0), None)),
756                ((Timestamp::new_second(8), None), (Timestamp::new_second(5), None)),
757                ((Timestamp::new_second(8), Some(Timestamp::new_second(10))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
758                ((Timestamp::new_second(8), Some(Timestamp::new_second(9))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
759            ],
760        }];
761
762        let query_engine = create_test_query_engine();
763        let ctx = QueryContext::arc();
764        for TestCase { sql, aligns } in testcases {
765            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, true)
766                .await
767                .unwrap();
768
769            let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
770                &plan,
771                query_engine.engine_state().catalog_manager().clone(),
772                ctx.clone(),
773            )
774            .await
775            .unwrap();
776
777            let time_window_expr = time_window_expr
778                .map(|expr| {
779                    TimeWindowExpr::from_expr(
780                        &expr,
781                        &column_name,
782                        &df_schema,
783                        &query_engine.engine_state().session_state(),
784                    )
785                })
786                .transpose()
787                .unwrap()
788                .unwrap();
789
790            let dirty = DirtyTimeWindows::default();
791            for (before_align, expected_after_align) in aligns {
792                let after_align = dirty
793                    .align_time_window(before_align.0, before_align.1, &time_window_expr)
794                    .unwrap();
795                assert_eq!(expected_after_align, after_align);
796            }
797        }
798    }
799}