flow/batching_mode/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Batching mode task state, which changes frequently
16
17use std::collections::BTreeMap;
18use std::time::Duration;
19
20use common_telemetry::debug;
21use common_telemetry::tracing::warn;
22use common_time::Timestamp;
23use datatypes::value::Value;
24use session::context::QueryContextRef;
25use snafu::{OptionExt, ResultExt};
26use tokio::sync::oneshot;
27use tokio::time::Instant;
28
29use crate::batching_mode::task::BatchingTask;
30use crate::batching_mode::time_window::TimeWindowExpr;
31use crate::batching_mode::MIN_REFRESH_DURATION;
32use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
33use crate::{Error, FlowId};
34
35/// The state of the [`BatchingTask`].
36#[derive(Debug)]
37pub struct TaskState {
38    /// Query context
39    pub(crate) query_ctx: QueryContextRef,
40    /// last query complete time
41    last_update_time: Instant,
42    /// last time query duration
43    last_query_duration: Duration,
44    /// Dirty Time windows need to be updated
45    /// mapping of `start -> end` and non-overlapping
46    pub(crate) dirty_time_windows: DirtyTimeWindows,
47    exec_state: ExecState,
48    /// Shutdown receiver
49    pub(crate) shutdown_rx: oneshot::Receiver<()>,
50    /// Task handle
51    pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
52}
53impl TaskState {
54    pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
55        Self {
56            query_ctx,
57            last_update_time: Instant::now(),
58            last_query_duration: Duration::from_secs(0),
59            dirty_time_windows: Default::default(),
60            exec_state: ExecState::Idle,
61            shutdown_rx,
62            task_handle: None,
63        }
64    }
65
66    /// called after last query is done
67    /// `is_succ` indicate whether the last query is successful
68    pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) {
69        self.exec_state = ExecState::Idle;
70        self.last_query_duration = elapsed;
71        self.last_update_time = Instant::now();
72    }
73
74    /// wait for at least `last_query_duration`, at most `max_timeout` to start next query
75    ///
76    /// if have more dirty time window, exec next query immediately
77    pub fn get_next_start_query_time(
78        &self,
79        flow_id: FlowId,
80        max_timeout: Option<Duration>,
81    ) -> Instant {
82        let next_duration = max_timeout
83            .unwrap_or(self.last_query_duration)
84            .min(self.last_query_duration);
85        let next_duration = next_duration.max(MIN_REFRESH_DURATION);
86
87        // if have dirty time window, execute immediately to clean dirty time window
88        if self.dirty_time_windows.windows.is_empty() {
89            self.last_update_time + next_duration
90        } else {
91            debug!(
92                "Flow id = {}, still have {} dirty time window({:?}), execute immediately",
93                flow_id,
94                self.dirty_time_windows.windows.len(),
95                self.dirty_time_windows.windows
96            );
97            Instant::now()
98        }
99    }
100}
101
102/// For keep recording of dirty time windows, which is time window that have new data inserted
103/// since last query.
104#[derive(Debug, Clone, Default)]
105pub struct DirtyTimeWindows {
106    /// windows's `start -> end` and non-overlapping
107    /// `end` is exclusive(and optional)
108    windows: BTreeMap<Timestamp, Option<Timestamp>>,
109}
110
111impl DirtyTimeWindows {
112    /// Time window merge distance
113    ///
114    /// TODO(discord9): make those configurable
115    const MERGE_DIST: i32 = 3;
116
117    /// Maximum number of filters allowed in a single query
118    const MAX_FILTER_NUM: usize = 20;
119
120    /// Add lower bounds to the dirty time windows. Upper bounds are ignored.
121    ///
122    /// # Arguments
123    ///
124    /// * `lower_bounds` - An iterator of lower bounds to be added.
125    pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
126        for lower_bound in lower_bounds {
127            let entry = self.windows.entry(lower_bound);
128            entry.or_insert(None);
129        }
130    }
131
132    pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
133        self.windows.insert(start, end);
134    }
135
136    /// Clean all dirty time windows, useful when can't found time window expr
137    pub fn clean(&mut self) {
138        self.windows.clear();
139    }
140
141    /// Generate all filter expressions consuming all time windows
142    pub fn gen_filter_exprs(
143        &mut self,
144        col_name: &str,
145        expire_lower_bound: Option<Timestamp>,
146        window_size: chrono::Duration,
147        flow_id: FlowId,
148        task_ctx: Option<&BatchingTask>,
149    ) -> Result<Option<datafusion_expr::Expr>, Error> {
150        debug!(
151            "expire_lower_bound: {:?}, window_size: {:?}",
152            expire_lower_bound.map(|t| t.to_iso8601_string()),
153            window_size
154        );
155        self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
156
157        if self.windows.len() > Self::MAX_FILTER_NUM {
158            let first_time_window = self.windows.first_key_value();
159            let last_time_window = self.windows.last_key_value();
160
161            if let Some(task_ctx) = task_ctx {
162                warn!(
163                "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
164                task_ctx.config.flow_id,
165                self.windows.len(),
166                Self::MAX_FILTER_NUM,
167                task_ctx.config.time_window_expr,
168                task_ctx.config.expire_after,
169                first_time_window,
170                last_time_window,
171                task_ctx.config.query
172            );
173            } else {
174                warn!("Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}",
175                flow_id,
176                self.windows.len(),
177                Self::MAX_FILTER_NUM,
178                first_time_window,
179                last_time_window
180                )
181            }
182        }
183
184        // get the first `MAX_FILTER_NUM` time windows
185        let nth = self
186            .windows
187            .iter()
188            .nth(Self::MAX_FILTER_NUM)
189            .map(|(key, _)| *key);
190        let first_nth = {
191            if let Some(nth) = nth {
192                let mut after = self.windows.split_off(&nth);
193                std::mem::swap(&mut self.windows, &mut after);
194
195                after
196            } else {
197                std::mem::take(&mut self.windows)
198            }
199        };
200
201        let mut expr_lst = vec![];
202        for (start, end) in first_nth.into_iter() {
203            // align using time window exprs
204            let (start, end) = if let Some(ctx) = task_ctx {
205                let Some(time_window_expr) = &ctx.config.time_window_expr else {
206                    UnexpectedSnafu {
207                        reason: "time_window_expr is not set",
208                    }
209                    .fail()?
210                };
211                self.align_time_window(start, end, time_window_expr)?
212            } else {
213                (start, end)
214            };
215            debug!(
216                "Time window start: {:?}, end: {:?}",
217                start.to_iso8601_string(),
218                end.map(|t| t.to_iso8601_string())
219            );
220
221            use datafusion_expr::{col, lit};
222            let lower = to_df_literal(start)?;
223            let upper = end.map(to_df_literal).transpose()?;
224            let expr = if let Some(upper) = upper {
225                col(col_name)
226                    .gt_eq(lit(lower))
227                    .and(col(col_name).lt(lit(upper)))
228            } else {
229                col(col_name).gt_eq(lit(lower))
230            };
231            expr_lst.push(expr);
232        }
233        let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
234        Ok(expr)
235    }
236
237    fn align_time_window(
238        &self,
239        start: Timestamp,
240        end: Option<Timestamp>,
241        time_window_expr: &TimeWindowExpr,
242    ) -> Result<(Timestamp, Option<Timestamp>), Error> {
243        let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
244            reason: format!(
245                "Failed to align start time {:?} with time window expr {:?}",
246                start, time_window_expr
247            ),
248        })?;
249        let align_end = end
250            .and_then(|end| {
251                time_window_expr
252                    .eval(end)
253                    // if after aligned, end is the same, then use end(because it's already aligned) else use aligned end
254                    .map(|r| if r.0 == Some(end) { r.0 } else { r.1 })
255                    .transpose()
256            })
257            .transpose()?;
258        Ok((align_start, align_end))
259    }
260
261    /// Merge time windows that overlaps or get too close
262    pub fn merge_dirty_time_windows(
263        &mut self,
264        window_size: chrono::Duration,
265        expire_lower_bound: Option<Timestamp>,
266    ) -> Result<(), Error> {
267        if self.windows.is_empty() {
268            return Ok(());
269        }
270
271        let mut new_windows = BTreeMap::new();
272
273        let std_window_size = window_size.to_std().map_err(|e| {
274            InternalSnafu {
275                reason: e.to_string(),
276            }
277            .build()
278        })?;
279
280        // previous time window
281        let mut prev_tw = None;
282        for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
283            // filter out expired time window
284            if let Some(expire_lower_bound) = expire_lower_bound {
285                if lower_bound < expire_lower_bound {
286                    continue;
287                }
288            }
289
290            let Some(prev_tw) = &mut prev_tw else {
291                prev_tw = Some((lower_bound, upper_bound));
292                continue;
293            };
294
295            // if cur.lower - prev.upper <= window_size * MERGE_DIST, merge
296            // this also deal with overlap windows because cur.lower > prev.lower is always true
297            let prev_upper = prev_tw
298                .1
299                .unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
300            prev_tw.1 = Some(prev_upper);
301
302            let cur_upper = upper_bound.unwrap_or(
303                lower_bound
304                    .add_duration(std_window_size)
305                    .context(TimeSnafu)?,
306            );
307
308            if lower_bound
309                .sub(&prev_upper)
310                .map(|dist| dist <= window_size * Self::MERGE_DIST)
311                .unwrap_or(false)
312            {
313                prev_tw.1 = Some(cur_upper);
314            } else {
315                new_windows.insert(prev_tw.0, prev_tw.1);
316                *prev_tw = (lower_bound, Some(cur_upper));
317            }
318        }
319
320        if let Some(prev_tw) = prev_tw {
321            new_windows.insert(prev_tw.0, prev_tw.1);
322        }
323
324        self.windows = new_windows;
325
326        Ok(())
327    }
328}
329
330fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
331    let value = Value::from(value);
332    let value = value
333        .try_to_scalar_value(&value.data_type())
334        .with_context(|_| DatatypesSnafu {
335            extra: format!("Failed to convert to scalar value: {}", value),
336        })?;
337    Ok(value)
338}
339
340#[derive(Debug, Clone)]
341enum ExecState {
342    Idle,
343    Executing,
344}
345
346#[cfg(test)]
347mod test {
348    use pretty_assertions::assert_eq;
349    use session::context::QueryContext;
350
351    use super::*;
352    use crate::batching_mode::time_window::find_time_window_expr;
353    use crate::batching_mode::utils::sql_to_df_plan;
354    use crate::test_utils::create_test_query_engine;
355
356    #[test]
357    fn test_merge_dirty_time_windows() {
358        let testcases = vec![
359            // just enough to merge
360            (
361                vec![
362                    Timestamp::new_second(0),
363                    Timestamp::new_second((1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
364                ],
365                (chrono::Duration::seconds(5 * 60), None),
366                BTreeMap::from([(
367                    Timestamp::new_second(0),
368                    Some(Timestamp::new_second(
369                        (2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
370                    )),
371                )]),
372                Some(
373                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:25:00' AS TIMESTAMP)))",
374                )
375            ),
376            // separate time window
377            (
378                vec![
379                    Timestamp::new_second(0),
380                    Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
381                ],
382                (chrono::Duration::seconds(5 * 60), None),
383                BTreeMap::from([
384                    (
385                        Timestamp::new_second(0),
386                        Some(Timestamp::new_second(5 * 60)),
387                    ),
388                    (
389                        Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
390                        Some(Timestamp::new_second(
391                            (3 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
392                        )),
393                    ),
394                ]),
395                Some(
396                    "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:05:00' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:25:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:30:00' AS TIMESTAMP))))",
397                )
398            ),
399            // overlapping
400            (
401                vec![
402                    Timestamp::new_second(0),
403                    Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
404                ],
405                (chrono::Duration::seconds(5 * 60), None),
406                BTreeMap::from([(
407                    Timestamp::new_second(0),
408                    Some(Timestamp::new_second(
409                        (1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
410                    )),
411                )]),
412                Some(
413                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:20:00' AS TIMESTAMP)))",
414                )
415            ),
416            // complex overlapping
417            (
418                vec![
419                    Timestamp::new_second(0),
420                    Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3),
421                    Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3 * 2),
422                ],
423                (chrono::Duration::seconds(3), None),
424                BTreeMap::from([(
425                    Timestamp::new_second(0),
426                    Some(Timestamp::new_second(
427                        (DirtyTimeWindows::MERGE_DIST as i64) * 7
428                    )),
429                )]),
430                Some(
431                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
432                )
433            ),
434            // expired
435            (
436                vec![
437                    Timestamp::new_second(0),
438                    Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
439                ],
440                (
441                    chrono::Duration::seconds(5 * 60),
442                    Some(Timestamp::new_second(
443                        (DirtyTimeWindows::MERGE_DIST as i64) * 6 * 60,
444                    )),
445                ),
446                BTreeMap::from([]),
447                None
448            ),
449        ];
450        for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
451            testcases
452        {
453            let mut dirty = DirtyTimeWindows::default();
454            dirty.add_lower_bounds(lower_bounds.into_iter());
455            dirty
456                .merge_dirty_time_windows(window_size, expire_lower_bound)
457                .unwrap();
458            assert_eq!(expected, dirty.windows);
459            let filter_expr = dirty
460                .gen_filter_exprs("ts", expire_lower_bound, window_size, 0, None)
461                .unwrap();
462
463            let unparser = datafusion::sql::unparser::Unparser::default();
464            let to_sql = filter_expr
465                .as_ref()
466                .map(|e| unparser.expr_to_sql(e).unwrap().to_string());
467            assert_eq!(expected_filter_expr, to_sql.as_deref());
468        }
469    }
470
471    #[tokio::test]
472    async fn test_align_time_window() {
473        type TimeWindow = (Timestamp, Option<Timestamp>);
474        struct TestCase {
475            sql: String,
476            aligns: Vec<(TimeWindow, TimeWindow)>,
477        }
478        let testcases: Vec<TestCase> = vec![TestCase{
479            sql: "SELECT date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window;".to_string(),
480            aligns: vec![
481                ((Timestamp::new_second(3), None), (Timestamp::new_second(0), None)),
482                ((Timestamp::new_second(8), None), (Timestamp::new_second(5), None)),
483                ((Timestamp::new_second(8), Some(Timestamp::new_second(10))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
484                ((Timestamp::new_second(8), Some(Timestamp::new_second(9))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
485            ],
486        }];
487
488        let query_engine = create_test_query_engine();
489        let ctx = QueryContext::arc();
490        for TestCase { sql, aligns } in testcases {
491            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, true)
492                .await
493                .unwrap();
494
495            let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
496                &plan,
497                query_engine.engine_state().catalog_manager().clone(),
498                ctx.clone(),
499            )
500            .await
501            .unwrap();
502
503            let time_window_expr = time_window_expr
504                .map(|expr| {
505                    TimeWindowExpr::from_expr(
506                        &expr,
507                        &column_name,
508                        &df_schema,
509                        &query_engine.engine_state().session_state(),
510                    )
511                })
512                .transpose()
513                .unwrap()
514                .unwrap();
515
516            let dirty = DirtyTimeWindows::default();
517            for (before_align, expected_after_align) in aligns {
518                let after_align = dirty
519                    .align_time_window(before_align.0, before_align.1, &time_window_expr)
520                    .unwrap();
521                assert_eq!(expected_after_align, after_align);
522            }
523        }
524    }
525}