flow/batching_mode/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Batching mode task state, which changes frequently
16
17use std::collections::BTreeMap;
18use std::time::Duration;
19
20use common_telemetry::debug;
21use common_telemetry::tracing::warn;
22use common_time::Timestamp;
23use datatypes::value::Value;
24use session::context::QueryContextRef;
25use snafu::{OptionExt, ResultExt};
26use tokio::sync::oneshot;
27use tokio::time::Instant;
28
29use crate::batching_mode::task::BatchingTask;
30use crate::batching_mode::time_window::TimeWindowExpr;
31use crate::batching_mode::MIN_REFRESH_DURATION;
32use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
33use crate::metrics::{
34    METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
35};
36use crate::{Error, FlowId};
37
38/// The state of the [`BatchingTask`].
39#[derive(Debug)]
40pub struct TaskState {
41    /// Query context
42    pub(crate) query_ctx: QueryContextRef,
43    /// last query complete time
44    last_update_time: Instant,
45    /// last time query duration
46    last_query_duration: Duration,
47    /// Dirty Time windows need to be updated
48    /// mapping of `start -> end` and non-overlapping
49    pub(crate) dirty_time_windows: DirtyTimeWindows,
50    exec_state: ExecState,
51    /// Shutdown receiver
52    pub(crate) shutdown_rx: oneshot::Receiver<()>,
53    /// Task handle
54    pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
55}
56impl TaskState {
57    pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
58        Self {
59            query_ctx,
60            last_update_time: Instant::now(),
61            last_query_duration: Duration::from_secs(0),
62            dirty_time_windows: Default::default(),
63            exec_state: ExecState::Idle,
64            shutdown_rx,
65            task_handle: None,
66        }
67    }
68
69    /// called after last query is done
70    /// `is_succ` indicate whether the last query is successful
71    pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) {
72        self.exec_state = ExecState::Idle;
73        self.last_query_duration = elapsed;
74        self.last_update_time = Instant::now();
75    }
76
77    /// Compute the next query delay based on the time window size or the last query duration.
78    /// Aiming to avoid too frequent queries. But also not too long delay.
79    ///
80    /// next wait time is calculated as:
81    /// last query duration, capped by [max(min_run_interval, time_window_size), max_timeout],
82    /// note at most wait for `max_timeout`.
83    ///
84    /// if current the dirty time range is longer than one query can handle,
85    /// execute immediately to faster clean up dirty time windows.
86    ///
87    pub fn get_next_start_query_time(
88        &self,
89        flow_id: FlowId,
90        time_window_size: &Option<Duration>,
91        max_timeout: Option<Duration>,
92    ) -> Instant {
93        // = last query duration, capped by [max(min_run_interval, time_window_size), max_timeout], note at most `max_timeout`
94        let lower = time_window_size.unwrap_or(MIN_REFRESH_DURATION);
95        let next_duration = self.last_query_duration.max(lower);
96        let next_duration = if let Some(max_timeout) = max_timeout {
97            next_duration.min(max_timeout)
98        } else {
99            next_duration
100        };
101
102        let cur_dirty_window_size = self.dirty_time_windows.window_size();
103        // compute how much time range can be handled in one query
104        let max_query_update_range = (*time_window_size)
105            .unwrap_or_default()
106            .mul_f64(DirtyTimeWindows::MAX_FILTER_NUM as f64);
107        // if dirty time range is more than one query can handle, execute immediately
108        // to faster clean up dirty time windows
109        if cur_dirty_window_size < max_query_update_range {
110            self.last_update_time + next_duration
111        } else {
112            // if dirty time windows can't be clean up in one query, execute immediately to faster
113            // clean up dirty time windows
114            debug!(
115                "Flow id = {}, still have too many {} dirty time window({:?}), execute immediately",
116                flow_id,
117                self.dirty_time_windows.windows.len(),
118                self.dirty_time_windows.windows
119            );
120            Instant::now()
121        }
122    }
123}
124
125/// For keep recording of dirty time windows, which is time window that have new data inserted
126/// since last query.
127#[derive(Debug, Clone, Default)]
128pub struct DirtyTimeWindows {
129    /// windows's `start -> end` and non-overlapping
130    /// `end` is exclusive(and optional)
131    windows: BTreeMap<Timestamp, Option<Timestamp>>,
132}
133
134impl DirtyTimeWindows {
135    /// Time window merge distance
136    ///
137    /// TODO(discord9): make those configurable
138    pub const MERGE_DIST: i32 = 3;
139
140    /// Maximum number of filters allowed in a single query
141    pub const MAX_FILTER_NUM: usize = 20;
142
143    /// Add lower bounds to the dirty time windows. Upper bounds are ignored.
144    ///
145    /// # Arguments
146    ///
147    /// * `lower_bounds` - An iterator of lower bounds to be added.
148    pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
149        for lower_bound in lower_bounds {
150            let entry = self.windows.entry(lower_bound);
151            entry.or_insert(None);
152        }
153    }
154
155    pub fn window_size(&self) -> Duration {
156        let mut ret = Duration::from_secs(0);
157        for (start, end) in &self.windows {
158            if let Some(end) = end {
159                if let Some(duration) = end.sub(start) {
160                    ret += duration.to_std().unwrap_or_default();
161                }
162            }
163        }
164        ret
165    }
166
167    pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
168        self.windows.insert(start, end);
169    }
170
171    /// Clean all dirty time windows, useful when can't found time window expr
172    pub fn clean(&mut self) {
173        self.windows.clear();
174    }
175
176    /// Number of dirty windows.
177    pub fn len(&self) -> usize {
178        self.windows.len()
179    }
180
181    /// Generate all filter expressions consuming all time windows
182    ///
183    /// there is two limits:
184    /// - shouldn't return a too long time range(<=`window_size * window_cnt`), so that the query can be executed in a reasonable time
185    /// - shouldn't return too many time range exprs, so that the query can be parsed properly instead of causing parser to overflow
186    pub fn gen_filter_exprs(
187        &mut self,
188        col_name: &str,
189        expire_lower_bound: Option<Timestamp>,
190        window_size: chrono::Duration,
191        window_cnt: usize,
192        flow_id: FlowId,
193        task_ctx: Option<&BatchingTask>,
194    ) -> Result<Option<datafusion_expr::Expr>, Error> {
195        debug!(
196            "expire_lower_bound: {:?}, window_size: {:?}",
197            expire_lower_bound.map(|t| t.to_iso8601_string()),
198            window_size
199        );
200        self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
201
202        if self.windows.len() > Self::MAX_FILTER_NUM {
203            let first_time_window = self.windows.first_key_value();
204            let last_time_window = self.windows.last_key_value();
205
206            if let Some(task_ctx) = task_ctx {
207                warn!(
208                "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
209                task_ctx.config.flow_id,
210                self.windows.len(),
211                Self::MAX_FILTER_NUM,
212                task_ctx.config.time_window_expr,
213                task_ctx.config.expire_after,
214                first_time_window,
215                last_time_window,
216                task_ctx.config.query
217            );
218            } else {
219                warn!("Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}",
220                flow_id,
221                self.windows.len(),
222                Self::MAX_FILTER_NUM,
223                first_time_window,
224                last_time_window
225                )
226            }
227        }
228
229        // get the first `window_cnt` time windows
230        let max_time_range = window_size * window_cnt as i32;
231        let nth = {
232            let mut cur_time_range = chrono::Duration::zero();
233            let mut nth_key = None;
234            for (idx, (start, end)) in self.windows.iter().enumerate() {
235                // if time range is too long, stop
236                if cur_time_range > max_time_range {
237                    nth_key = Some(*start);
238                    break;
239                }
240
241                // if we have enough time windows, stop
242                if idx >= window_cnt {
243                    nth_key = Some(*start);
244                    break;
245                }
246
247                if let Some(end) = end {
248                    if let Some(x) = end.sub(start) {
249                        cur_time_range += x;
250                    }
251                }
252            }
253
254            nth_key
255        };
256        let first_nth = {
257            if let Some(nth) = nth {
258                let mut after = self.windows.split_off(&nth);
259                std::mem::swap(&mut self.windows, &mut after);
260
261                after
262            } else {
263                std::mem::take(&mut self.windows)
264            }
265        };
266
267        METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
268            .with_label_values(&[flow_id.to_string().as_str()])
269            .observe(first_nth.len() as f64);
270
271        let full_time_range = first_nth
272            .iter()
273            .fold(chrono::Duration::zero(), |acc, (start, end)| {
274                if let Some(end) = end {
275                    acc + end.sub(start).unwrap_or(chrono::Duration::zero())
276                } else {
277                    acc
278                }
279            })
280            .num_seconds() as f64;
281        METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
282            .with_label_values(&[flow_id.to_string().as_str()])
283            .observe(full_time_range);
284
285        let mut expr_lst = vec![];
286        for (start, end) in first_nth.into_iter() {
287            // align using time window exprs
288            let (start, end) = if let Some(ctx) = task_ctx {
289                let Some(time_window_expr) = &ctx.config.time_window_expr else {
290                    UnexpectedSnafu {
291                        reason: "time_window_expr is not set",
292                    }
293                    .fail()?
294                };
295                self.align_time_window(start, end, time_window_expr)?
296            } else {
297                (start, end)
298            };
299            debug!(
300                "Time window start: {:?}, end: {:?}",
301                start.to_iso8601_string(),
302                end.map(|t| t.to_iso8601_string())
303            );
304
305            use datafusion_expr::{col, lit};
306            let lower = to_df_literal(start)?;
307            let upper = end.map(to_df_literal).transpose()?;
308            let expr = if let Some(upper) = upper {
309                col(col_name)
310                    .gt_eq(lit(lower))
311                    .and(col(col_name).lt(lit(upper)))
312            } else {
313                col(col_name).gt_eq(lit(lower))
314            };
315            expr_lst.push(expr);
316        }
317        let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
318        Ok(expr)
319    }
320
321    fn align_time_window(
322        &self,
323        start: Timestamp,
324        end: Option<Timestamp>,
325        time_window_expr: &TimeWindowExpr,
326    ) -> Result<(Timestamp, Option<Timestamp>), Error> {
327        let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
328            reason: format!(
329                "Failed to align start time {:?} with time window expr {:?}",
330                start, time_window_expr
331            ),
332        })?;
333        let align_end = end
334            .and_then(|end| {
335                time_window_expr
336                    .eval(end)
337                    // if after aligned, end is the same, then use end(because it's already aligned) else use aligned end
338                    .map(|r| if r.0 == Some(end) { r.0 } else { r.1 })
339                    .transpose()
340            })
341            .transpose()?;
342        Ok((align_start, align_end))
343    }
344
345    /// Merge time windows that overlaps or get too close
346    ///
347    /// TODO(discord9): not merge and prefer to send smaller time windows? how?
348    pub fn merge_dirty_time_windows(
349        &mut self,
350        window_size: chrono::Duration,
351        expire_lower_bound: Option<Timestamp>,
352    ) -> Result<(), Error> {
353        if self.windows.is_empty() {
354            return Ok(());
355        }
356
357        let mut new_windows = BTreeMap::new();
358
359        let std_window_size = window_size.to_std().map_err(|e| {
360            InternalSnafu {
361                reason: e.to_string(),
362            }
363            .build()
364        })?;
365
366        // previous time window
367        let mut prev_tw = None;
368        for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
369            // filter out expired time window
370            if let Some(expire_lower_bound) = expire_lower_bound {
371                if lower_bound < expire_lower_bound {
372                    continue;
373                }
374            }
375
376            let Some(prev_tw) = &mut prev_tw else {
377                prev_tw = Some((lower_bound, upper_bound));
378                continue;
379            };
380
381            // if cur.lower - prev.upper <= window_size * MERGE_DIST, merge
382            // this also deal with overlap windows because cur.lower > prev.lower is always true
383            let prev_upper = prev_tw
384                .1
385                .unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
386            prev_tw.1 = Some(prev_upper);
387
388            let cur_upper = upper_bound.unwrap_or(
389                lower_bound
390                    .add_duration(std_window_size)
391                    .context(TimeSnafu)?,
392            );
393
394            if lower_bound
395                .sub(&prev_upper)
396                .map(|dist| dist <= window_size * Self::MERGE_DIST)
397                .unwrap_or(false)
398            {
399                prev_tw.1 = Some(cur_upper);
400            } else {
401                new_windows.insert(prev_tw.0, prev_tw.1);
402                *prev_tw = (lower_bound, Some(cur_upper));
403            }
404        }
405
406        if let Some(prev_tw) = prev_tw {
407            new_windows.insert(prev_tw.0, prev_tw.1);
408        }
409
410        self.windows = new_windows;
411
412        Ok(())
413    }
414}
415
416fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
417    let value = Value::from(value);
418    let value = value
419        .try_to_scalar_value(&value.data_type())
420        .with_context(|_| DatatypesSnafu {
421            extra: format!("Failed to convert to scalar value: {}", value),
422        })?;
423    Ok(value)
424}
425
426#[derive(Debug, Clone)]
427enum ExecState {
428    Idle,
429    Executing,
430}
431
432#[cfg(test)]
433mod test {
434    use pretty_assertions::assert_eq;
435    use session::context::QueryContext;
436
437    use super::*;
438    use crate::batching_mode::time_window::find_time_window_expr;
439    use crate::batching_mode::utils::sql_to_df_plan;
440    use crate::test_utils::create_test_query_engine;
441
442    #[test]
443    fn test_merge_dirty_time_windows() {
444        let testcases = vec![
445            // just enough to merge
446            (
447                vec![
448                    Timestamp::new_second(0),
449                    Timestamp::new_second((1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
450                ],
451                (chrono::Duration::seconds(5 * 60), None),
452                BTreeMap::from([(
453                    Timestamp::new_second(0),
454                    Some(Timestamp::new_second(
455                        (2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
456                    )),
457                )]),
458                Some(
459                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:25:00' AS TIMESTAMP)))",
460                )
461            ),
462            // separate time window
463            (
464                vec![
465                    Timestamp::new_second(0),
466                    Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
467                ],
468                (chrono::Duration::seconds(5 * 60), None),
469                BTreeMap::from([
470                    (
471                        Timestamp::new_second(0),
472                        Some(Timestamp::new_second(5 * 60)),
473                    ),
474                    (
475                        Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
476                        Some(Timestamp::new_second(
477                            (3 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
478                        )),
479                    ),
480                ]),
481                Some(
482                    "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:05:00' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:25:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:30:00' AS TIMESTAMP))))",
483                )
484            ),
485            // overlapping
486            (
487                vec![
488                    Timestamp::new_second(0),
489                    Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
490                ],
491                (chrono::Duration::seconds(5 * 60), None),
492                BTreeMap::from([(
493                    Timestamp::new_second(0),
494                    Some(Timestamp::new_second(
495                        (1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
496                    )),
497                )]),
498                Some(
499                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:20:00' AS TIMESTAMP)))",
500                )
501            ),
502            // complex overlapping
503            (
504                vec![
505                    Timestamp::new_second(0),
506                    Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3),
507                    Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3 * 2),
508                ],
509                (chrono::Duration::seconds(3), None),
510                BTreeMap::from([(
511                    Timestamp::new_second(0),
512                    Some(Timestamp::new_second(
513                        (DirtyTimeWindows::MERGE_DIST as i64) * 7
514                    )),
515                )]),
516                Some(
517                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
518                )
519            ),
520            // expired
521            (
522                vec![
523                    Timestamp::new_second(0),
524                    Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
525                ],
526                (
527                    chrono::Duration::seconds(5 * 60),
528                    Some(Timestamp::new_second(
529                        (DirtyTimeWindows::MERGE_DIST as i64) * 6 * 60,
530                    )),
531                ),
532                BTreeMap::from([]),
533                None
534            ),
535        ];
536        for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
537            testcases
538        {
539            let mut dirty = DirtyTimeWindows::default();
540            dirty.add_lower_bounds(lower_bounds.into_iter());
541            dirty
542                .merge_dirty_time_windows(window_size, expire_lower_bound)
543                .unwrap();
544            assert_eq!(expected, dirty.windows);
545            let filter_expr = dirty
546                .gen_filter_exprs(
547                    "ts",
548                    expire_lower_bound,
549                    window_size,
550                    DirtyTimeWindows::MAX_FILTER_NUM,
551                    0,
552                    None,
553                )
554                .unwrap();
555
556            let unparser = datafusion::sql::unparser::Unparser::default();
557            let to_sql = filter_expr
558                .as_ref()
559                .map(|e| unparser.expr_to_sql(e).unwrap().to_string());
560            assert_eq!(expected_filter_expr, to_sql.as_deref());
561        }
562    }
563
564    #[tokio::test]
565    async fn test_align_time_window() {
566        type TimeWindow = (Timestamp, Option<Timestamp>);
567        struct TestCase {
568            sql: String,
569            aligns: Vec<(TimeWindow, TimeWindow)>,
570        }
571        let testcases: Vec<TestCase> = vec![TestCase{
572            sql: "SELECT date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window;".to_string(),
573            aligns: vec![
574                ((Timestamp::new_second(3), None), (Timestamp::new_second(0), None)),
575                ((Timestamp::new_second(8), None), (Timestamp::new_second(5), None)),
576                ((Timestamp::new_second(8), Some(Timestamp::new_second(10))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
577                ((Timestamp::new_second(8), Some(Timestamp::new_second(9))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
578            ],
579        }];
580
581        let query_engine = create_test_query_engine();
582        let ctx = QueryContext::arc();
583        for TestCase { sql, aligns } in testcases {
584            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, true)
585                .await
586                .unwrap();
587
588            let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
589                &plan,
590                query_engine.engine_state().catalog_manager().clone(),
591                ctx.clone(),
592            )
593            .await
594            .unwrap();
595
596            let time_window_expr = time_window_expr
597                .map(|expr| {
598                    TimeWindowExpr::from_expr(
599                        &expr,
600                        &column_name,
601                        &df_schema,
602                        &query_engine.engine_state().session_state(),
603                    )
604                })
605                .transpose()
606                .unwrap()
607                .unwrap();
608
609            let dirty = DirtyTimeWindows::default();
610            for (before_align, expected_after_align) in aligns {
611                let after_align = dirty
612                    .align_time_window(before_align.0, before_align.1, &time_window_expr)
613                    .unwrap();
614                assert_eq!(expected_after_align, after_align);
615            }
616        }
617    }
618}