flow/batching_mode/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Batching mode task state, which changes frequently
16
17use std::collections::BTreeMap;
18use std::time::Duration;
19
20use common_telemetry::debug;
21use common_telemetry::tracing::warn;
22use common_time::Timestamp;
23use datatypes::value::Value;
24use session::context::QueryContextRef;
25use snafu::{OptionExt, ResultExt};
26use tokio::sync::oneshot;
27use tokio::time::Instant;
28
29use crate::batching_mode::task::BatchingTask;
30use crate::batching_mode::time_window::TimeWindowExpr;
31use crate::batching_mode::MIN_REFRESH_DURATION;
32use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
33use crate::metrics::{
34    METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
35};
36use crate::{Error, FlowId};
37
38/// The state of the [`BatchingTask`].
39#[derive(Debug)]
40pub struct TaskState {
41    /// Query context
42    pub(crate) query_ctx: QueryContextRef,
43    /// last query complete time
44    last_update_time: Instant,
45    /// last time query duration
46    last_query_duration: Duration,
47    /// Dirty Time windows need to be updated
48    /// mapping of `start -> end` and non-overlapping
49    pub(crate) dirty_time_windows: DirtyTimeWindows,
50    exec_state: ExecState,
51    /// Shutdown receiver
52    pub(crate) shutdown_rx: oneshot::Receiver<()>,
53    /// Task handle
54    pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
55}
56impl TaskState {
57    pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
58        Self {
59            query_ctx,
60            last_update_time: Instant::now(),
61            last_query_duration: Duration::from_secs(0),
62            dirty_time_windows: Default::default(),
63            exec_state: ExecState::Idle,
64            shutdown_rx,
65            task_handle: None,
66        }
67    }
68
69    /// called after last query is done
70    /// `is_succ` indicate whether the last query is successful
71    pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) {
72        self.exec_state = ExecState::Idle;
73        self.last_query_duration = elapsed;
74        self.last_update_time = Instant::now();
75    }
76
77    /// Compute the next query delay based on the time window size or the last query duration.
78    /// Aiming to avoid too frequent queries. But also not too long delay.
79    /// The delay is computed as follows:
80    /// - If `time_window_size` is set, the delay is half the time window size, constrained to be
81    ///   at least `last_query_duration` and at most `max_timeout`.
82    /// - If `time_window_size` is not set, the delay defaults to `last_query_duration`, constrained
83    ///   to be at least `MIN_REFRESH_DURATION` and at most `max_timeout`.
84    ///
85    /// If there are dirty time windows, the function returns an immediate execution time to clean them.
86    /// TODO: Make this behavior configurable.
87    pub fn get_next_start_query_time(
88        &self,
89        flow_id: FlowId,
90        time_window_size: &Option<Duration>,
91        max_timeout: Option<Duration>,
92    ) -> Instant {
93        let last_duration = max_timeout
94            .unwrap_or(self.last_query_duration)
95            .min(self.last_query_duration)
96            .max(MIN_REFRESH_DURATION);
97
98        let next_duration = time_window_size
99            .map(|t| {
100                let half = t / 2;
101                half.max(last_duration)
102            })
103            .unwrap_or(last_duration);
104
105        // if have dirty time window, execute immediately to clean dirty time window
106        if self.dirty_time_windows.windows.is_empty() {
107            self.last_update_time + next_duration
108        } else {
109            debug!(
110                "Flow id = {}, still have {} dirty time window({:?}), execute immediately",
111                flow_id,
112                self.dirty_time_windows.windows.len(),
113                self.dirty_time_windows.windows
114            );
115            Instant::now()
116        }
117    }
118}
119
120/// For keep recording of dirty time windows, which is time window that have new data inserted
121/// since last query.
122#[derive(Debug, Clone, Default)]
123pub struct DirtyTimeWindows {
124    /// windows's `start -> end` and non-overlapping
125    /// `end` is exclusive(and optional)
126    windows: BTreeMap<Timestamp, Option<Timestamp>>,
127}
128
129impl DirtyTimeWindows {
130    /// Time window merge distance
131    ///
132    /// TODO(discord9): make those configurable
133    pub const MERGE_DIST: i32 = 3;
134
135    /// Maximum number of filters allowed in a single query
136    pub const MAX_FILTER_NUM: usize = 20;
137
138    /// Add lower bounds to the dirty time windows. Upper bounds are ignored.
139    ///
140    /// # Arguments
141    ///
142    /// * `lower_bounds` - An iterator of lower bounds to be added.
143    pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
144        for lower_bound in lower_bounds {
145            let entry = self.windows.entry(lower_bound);
146            entry.or_insert(None);
147        }
148    }
149
150    pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
151        self.windows.insert(start, end);
152    }
153
154    /// Clean all dirty time windows, useful when can't found time window expr
155    pub fn clean(&mut self) {
156        self.windows.clear();
157    }
158
159    /// Generate all filter expressions consuming all time windows
160    ///
161    /// there is two limits:
162    /// - shouldn't return a too long time range(<=`window_size * window_cnt`), so that the query can be executed in a reasonable time
163    /// - shouldn't return too many time range exprs, so that the query can be parsed properly instead of causing parser to overflow
164    pub fn gen_filter_exprs(
165        &mut self,
166        col_name: &str,
167        expire_lower_bound: Option<Timestamp>,
168        window_size: chrono::Duration,
169        window_cnt: usize,
170        flow_id: FlowId,
171        task_ctx: Option<&BatchingTask>,
172    ) -> Result<Option<datafusion_expr::Expr>, Error> {
173        debug!(
174            "expire_lower_bound: {:?}, window_size: {:?}",
175            expire_lower_bound.map(|t| t.to_iso8601_string()),
176            window_size
177        );
178        self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
179
180        if self.windows.len() > Self::MAX_FILTER_NUM {
181            let first_time_window = self.windows.first_key_value();
182            let last_time_window = self.windows.last_key_value();
183
184            if let Some(task_ctx) = task_ctx {
185                warn!(
186                "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
187                task_ctx.config.flow_id,
188                self.windows.len(),
189                Self::MAX_FILTER_NUM,
190                task_ctx.config.time_window_expr,
191                task_ctx.config.expire_after,
192                first_time_window,
193                last_time_window,
194                task_ctx.config.query
195            );
196            } else {
197                warn!("Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}",
198                flow_id,
199                self.windows.len(),
200                Self::MAX_FILTER_NUM,
201                first_time_window,
202                last_time_window
203                )
204            }
205        }
206
207        // get the first `window_cnt` time windows
208        let max_time_range = window_size * window_cnt as i32;
209        let nth = {
210            let mut cur_time_range = chrono::Duration::zero();
211            let mut nth_key = None;
212            for (idx, (start, end)) in self.windows.iter().enumerate() {
213                // if time range is too long, stop
214                if cur_time_range > max_time_range {
215                    nth_key = Some(*start);
216                    break;
217                }
218
219                // if we have enough time windows, stop
220                if idx >= window_cnt {
221                    nth_key = Some(*start);
222                    break;
223                }
224
225                if let Some(end) = end {
226                    if let Some(x) = end.sub(start) {
227                        cur_time_range += x;
228                    }
229                }
230            }
231
232            nth_key
233        };
234        let first_nth = {
235            if let Some(nth) = nth {
236                let mut after = self.windows.split_off(&nth);
237                std::mem::swap(&mut self.windows, &mut after);
238
239                after
240            } else {
241                std::mem::take(&mut self.windows)
242            }
243        };
244
245        METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
246            .with_label_values(&[flow_id.to_string().as_str()])
247            .observe(first_nth.len() as f64);
248
249        let full_time_range = first_nth
250            .iter()
251            .fold(chrono::Duration::zero(), |acc, (start, end)| {
252                if let Some(end) = end {
253                    acc + end.sub(start).unwrap_or(chrono::Duration::zero())
254                } else {
255                    acc
256                }
257            })
258            .num_seconds() as f64;
259        METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
260            .with_label_values(&[flow_id.to_string().as_str()])
261            .observe(full_time_range);
262
263        let mut expr_lst = vec![];
264        for (start, end) in first_nth.into_iter() {
265            // align using time window exprs
266            let (start, end) = if let Some(ctx) = task_ctx {
267                let Some(time_window_expr) = &ctx.config.time_window_expr else {
268                    UnexpectedSnafu {
269                        reason: "time_window_expr is not set",
270                    }
271                    .fail()?
272                };
273                self.align_time_window(start, end, time_window_expr)?
274            } else {
275                (start, end)
276            };
277            debug!(
278                "Time window start: {:?}, end: {:?}",
279                start.to_iso8601_string(),
280                end.map(|t| t.to_iso8601_string())
281            );
282
283            use datafusion_expr::{col, lit};
284            let lower = to_df_literal(start)?;
285            let upper = end.map(to_df_literal).transpose()?;
286            let expr = if let Some(upper) = upper {
287                col(col_name)
288                    .gt_eq(lit(lower))
289                    .and(col(col_name).lt(lit(upper)))
290            } else {
291                col(col_name).gt_eq(lit(lower))
292            };
293            expr_lst.push(expr);
294        }
295        let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
296        Ok(expr)
297    }
298
299    fn align_time_window(
300        &self,
301        start: Timestamp,
302        end: Option<Timestamp>,
303        time_window_expr: &TimeWindowExpr,
304    ) -> Result<(Timestamp, Option<Timestamp>), Error> {
305        let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
306            reason: format!(
307                "Failed to align start time {:?} with time window expr {:?}",
308                start, time_window_expr
309            ),
310        })?;
311        let align_end = end
312            .and_then(|end| {
313                time_window_expr
314                    .eval(end)
315                    // if after aligned, end is the same, then use end(because it's already aligned) else use aligned end
316                    .map(|r| if r.0 == Some(end) { r.0 } else { r.1 })
317                    .transpose()
318            })
319            .transpose()?;
320        Ok((align_start, align_end))
321    }
322
323    /// Merge time windows that overlaps or get too close
324    ///
325    /// TODO(discord9): not merge and prefer to send smaller time windows? how?
326    pub fn merge_dirty_time_windows(
327        &mut self,
328        window_size: chrono::Duration,
329        expire_lower_bound: Option<Timestamp>,
330    ) -> Result<(), Error> {
331        if self.windows.is_empty() {
332            return Ok(());
333        }
334
335        let mut new_windows = BTreeMap::new();
336
337        let std_window_size = window_size.to_std().map_err(|e| {
338            InternalSnafu {
339                reason: e.to_string(),
340            }
341            .build()
342        })?;
343
344        // previous time window
345        let mut prev_tw = None;
346        for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
347            // filter out expired time window
348            if let Some(expire_lower_bound) = expire_lower_bound {
349                if lower_bound < expire_lower_bound {
350                    continue;
351                }
352            }
353
354            let Some(prev_tw) = &mut prev_tw else {
355                prev_tw = Some((lower_bound, upper_bound));
356                continue;
357            };
358
359            // if cur.lower - prev.upper <= window_size * MERGE_DIST, merge
360            // this also deal with overlap windows because cur.lower > prev.lower is always true
361            let prev_upper = prev_tw
362                .1
363                .unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
364            prev_tw.1 = Some(prev_upper);
365
366            let cur_upper = upper_bound.unwrap_or(
367                lower_bound
368                    .add_duration(std_window_size)
369                    .context(TimeSnafu)?,
370            );
371
372            if lower_bound
373                .sub(&prev_upper)
374                .map(|dist| dist <= window_size * Self::MERGE_DIST)
375                .unwrap_or(false)
376            {
377                prev_tw.1 = Some(cur_upper);
378            } else {
379                new_windows.insert(prev_tw.0, prev_tw.1);
380                *prev_tw = (lower_bound, Some(cur_upper));
381            }
382        }
383
384        if let Some(prev_tw) = prev_tw {
385            new_windows.insert(prev_tw.0, prev_tw.1);
386        }
387
388        self.windows = new_windows;
389
390        Ok(())
391    }
392}
393
394fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
395    let value = Value::from(value);
396    let value = value
397        .try_to_scalar_value(&value.data_type())
398        .with_context(|_| DatatypesSnafu {
399            extra: format!("Failed to convert to scalar value: {}", value),
400        })?;
401    Ok(value)
402}
403
404#[derive(Debug, Clone)]
405enum ExecState {
406    Idle,
407    Executing,
408}
409
410#[cfg(test)]
411mod test {
412    use pretty_assertions::assert_eq;
413    use session::context::QueryContext;
414
415    use super::*;
416    use crate::batching_mode::time_window::find_time_window_expr;
417    use crate::batching_mode::utils::sql_to_df_plan;
418    use crate::test_utils::create_test_query_engine;
419
420    #[test]
421    fn test_merge_dirty_time_windows() {
422        let testcases = vec![
423            // just enough to merge
424            (
425                vec![
426                    Timestamp::new_second(0),
427                    Timestamp::new_second((1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
428                ],
429                (chrono::Duration::seconds(5 * 60), None),
430                BTreeMap::from([(
431                    Timestamp::new_second(0),
432                    Some(Timestamp::new_second(
433                        (2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
434                    )),
435                )]),
436                Some(
437                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:25:00' AS TIMESTAMP)))",
438                )
439            ),
440            // separate time window
441            (
442                vec![
443                    Timestamp::new_second(0),
444                    Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
445                ],
446                (chrono::Duration::seconds(5 * 60), None),
447                BTreeMap::from([
448                    (
449                        Timestamp::new_second(0),
450                        Some(Timestamp::new_second(5 * 60)),
451                    ),
452                    (
453                        Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
454                        Some(Timestamp::new_second(
455                            (3 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
456                        )),
457                    ),
458                ]),
459                Some(
460                    "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:05:00' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:25:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:30:00' AS TIMESTAMP))))",
461                )
462            ),
463            // overlapping
464            (
465                vec![
466                    Timestamp::new_second(0),
467                    Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
468                ],
469                (chrono::Duration::seconds(5 * 60), None),
470                BTreeMap::from([(
471                    Timestamp::new_second(0),
472                    Some(Timestamp::new_second(
473                        (1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
474                    )),
475                )]),
476                Some(
477                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:20:00' AS TIMESTAMP)))",
478                )
479            ),
480            // complex overlapping
481            (
482                vec![
483                    Timestamp::new_second(0),
484                    Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3),
485                    Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3 * 2),
486                ],
487                (chrono::Duration::seconds(3), None),
488                BTreeMap::from([(
489                    Timestamp::new_second(0),
490                    Some(Timestamp::new_second(
491                        (DirtyTimeWindows::MERGE_DIST as i64) * 7
492                    )),
493                )]),
494                Some(
495                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
496                )
497            ),
498            // expired
499            (
500                vec![
501                    Timestamp::new_second(0),
502                    Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
503                ],
504                (
505                    chrono::Duration::seconds(5 * 60),
506                    Some(Timestamp::new_second(
507                        (DirtyTimeWindows::MERGE_DIST as i64) * 6 * 60,
508                    )),
509                ),
510                BTreeMap::from([]),
511                None
512            ),
513        ];
514        for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
515            testcases
516        {
517            let mut dirty = DirtyTimeWindows::default();
518            dirty.add_lower_bounds(lower_bounds.into_iter());
519            dirty
520                .merge_dirty_time_windows(window_size, expire_lower_bound)
521                .unwrap();
522            assert_eq!(expected, dirty.windows);
523            let filter_expr = dirty
524                .gen_filter_exprs(
525                    "ts",
526                    expire_lower_bound,
527                    window_size,
528                    DirtyTimeWindows::MAX_FILTER_NUM,
529                    0,
530                    None,
531                )
532                .unwrap();
533
534            let unparser = datafusion::sql::unparser::Unparser::default();
535            let to_sql = filter_expr
536                .as_ref()
537                .map(|e| unparser.expr_to_sql(e).unwrap().to_string());
538            assert_eq!(expected_filter_expr, to_sql.as_deref());
539        }
540    }
541
542    #[tokio::test]
543    async fn test_align_time_window() {
544        type TimeWindow = (Timestamp, Option<Timestamp>);
545        struct TestCase {
546            sql: String,
547            aligns: Vec<(TimeWindow, TimeWindow)>,
548        }
549        let testcases: Vec<TestCase> = vec![TestCase{
550            sql: "SELECT date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window;".to_string(),
551            aligns: vec![
552                ((Timestamp::new_second(3), None), (Timestamp::new_second(0), None)),
553                ((Timestamp::new_second(8), None), (Timestamp::new_second(5), None)),
554                ((Timestamp::new_second(8), Some(Timestamp::new_second(10))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
555                ((Timestamp::new_second(8), Some(Timestamp::new_second(9))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
556            ],
557        }];
558
559        let query_engine = create_test_query_engine();
560        let ctx = QueryContext::arc();
561        for TestCase { sql, aligns } in testcases {
562            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, true)
563                .await
564                .unwrap();
565
566            let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
567                &plan,
568                query_engine.engine_state().catalog_manager().clone(),
569                ctx.clone(),
570            )
571            .await
572            .unwrap();
573
574            let time_window_expr = time_window_expr
575                .map(|expr| {
576                    TimeWindowExpr::from_expr(
577                        &expr,
578                        &column_name,
579                        &df_schema,
580                        &query_engine.engine_state().session_state(),
581                    )
582                })
583                .transpose()
584                .unwrap()
585                .unwrap();
586
587            let dirty = DirtyTimeWindows::default();
588            for (before_align, expected_after_align) in aligns {
589                let after_align = dirty
590                    .align_time_window(before_align.0, before_align.1, &time_window_expr)
591                    .unwrap();
592                assert_eq!(expected_after_align, after_align);
593            }
594        }
595    }
596}