1use std::collections::BTreeMap;
18use std::time::Duration;
19
20use common_telemetry::debug;
21use common_telemetry::tracing::warn;
22use common_time::Timestamp;
23use datatypes::value::Value;
24use session::context::QueryContextRef;
25use snafu::{ensure, OptionExt, ResultExt};
26use tokio::sync::oneshot;
27use tokio::time::Instant;
28
29use crate::batching_mode::task::BatchingTask;
30use crate::batching_mode::time_window::TimeWindowExpr;
31use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
32use crate::metrics::{
33 METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE,
34 METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE,
35};
36use crate::{Error, FlowId};
37
38#[derive(Debug)]
40pub struct TaskState {
41 pub(crate) query_ctx: QueryContextRef,
43 last_update_time: Instant,
45 last_query_duration: Duration,
47 pub(crate) dirty_time_windows: DirtyTimeWindows,
50 exec_state: ExecState,
51 pub(crate) shutdown_rx: oneshot::Receiver<()>,
53 pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
55}
56impl TaskState {
57 pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
58 Self {
59 query_ctx,
60 last_update_time: Instant::now(),
61 last_query_duration: Duration::from_secs(0),
62 dirty_time_windows: Default::default(),
63 exec_state: ExecState::Idle,
64 shutdown_rx,
65 task_handle: None,
66 }
67 }
68
69 pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) {
72 self.exec_state = ExecState::Idle;
73 self.last_query_duration = elapsed;
74 self.last_update_time = Instant::now();
75 }
76
77 pub fn get_next_start_query_time(
88 &self,
89 flow_id: FlowId,
90 time_window_size: &Option<Duration>,
91 min_refresh_duration: Duration,
92 max_timeout: Option<Duration>,
93 max_filter_num_per_query: usize,
94 ) -> Instant {
95 let lower = time_window_size.unwrap_or(min_refresh_duration);
97 let next_duration = self.last_query_duration.max(lower);
98 let next_duration = if let Some(max_timeout) = max_timeout {
99 next_duration.min(max_timeout)
100 } else {
101 next_duration
102 };
103
104 let cur_dirty_window_size = self.dirty_time_windows.window_size();
105 let max_query_update_range = (*time_window_size)
107 .unwrap_or_default()
108 .mul_f64(max_filter_num_per_query as f64);
109 if cur_dirty_window_size < max_query_update_range {
112 self.last_update_time + next_duration
113 } else {
114 debug!(
117 "Flow id = {}, still have too many {} dirty time window({:?}), execute immediately",
118 flow_id,
119 self.dirty_time_windows.windows.len(),
120 self.dirty_time_windows.windows
121 );
122 Instant::now()
123 }
124 }
125}
126
127#[derive(Debug, Clone)]
130pub struct DirtyTimeWindows {
131 windows: BTreeMap<Timestamp, Option<Timestamp>>,
134 max_filter_num_per_query: usize,
136 time_window_merge_threshold: usize,
139}
140
141impl DirtyTimeWindows {
142 pub fn new(max_filter_num_per_query: usize, time_window_merge_threshold: usize) -> Self {
143 Self {
144 windows: BTreeMap::new(),
145 max_filter_num_per_query,
146 time_window_merge_threshold,
147 }
148 }
149}
150
151impl Default for DirtyTimeWindows {
152 fn default() -> Self {
153 Self {
154 windows: BTreeMap::new(),
155 max_filter_num_per_query: 20,
156 time_window_merge_threshold: 3,
157 }
158 }
159}
160
161impl DirtyTimeWindows {
162 pub const MERGE_DIST: i32 = 3;
166
167 pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
173 for lower_bound in lower_bounds {
174 let entry = self.windows.entry(lower_bound);
175 entry.or_insert(None);
176 }
177 }
178
179 pub fn window_size(&self) -> Duration {
180 let mut ret = Duration::from_secs(0);
181 for (start, end) in &self.windows {
182 if let Some(end) = end {
183 if let Some(duration) = end.sub(start) {
184 ret += duration.to_std().unwrap_or_default();
185 }
186 }
187 }
188 ret
189 }
190
191 pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
192 self.windows.insert(start, end);
193 }
194
195 pub fn add_windows(&mut self, time_ranges: Vec<(Timestamp, Timestamp)>) {
196 for (start, end) in time_ranges {
197 self.windows.insert(start, Some(end));
198 }
199 }
200
201 pub fn clean(&mut self) {
203 self.windows.clear();
204 }
205
206 pub fn len(&self) -> usize {
208 self.windows.len()
209 }
210
211 pub fn effective_count(&self, window_size: &Duration) -> usize {
214 if self.windows.is_empty() {
215 return 0;
216 }
217 let window_size =
218 chrono::Duration::from_std(*window_size).unwrap_or(chrono::Duration::zero());
219 let total_window_time_range =
220 self.windows
221 .iter()
222 .fold(chrono::Duration::zero(), |acc, (start, end)| {
223 if let Some(end) = end {
224 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
225 } else {
226 acc + window_size
227 }
228 });
229
230 if window_size.num_seconds() == 0 {
232 0
233 } else {
234 (total_window_time_range.num_seconds() / window_size.num_seconds()) as usize
235 }
236 }
237
238 pub fn gen_filter_exprs(
244 &mut self,
245 col_name: &str,
246 expire_lower_bound: Option<Timestamp>,
247 window_size: chrono::Duration,
248 window_cnt: usize,
249 flow_id: FlowId,
250 task_ctx: Option<&BatchingTask>,
251 ) -> Result<Option<FilterExprInfo>, Error> {
252 ensure!(
253 window_size.num_seconds() > 0,
254 UnexpectedSnafu {
255 reason: "window_size is zero, can't generate filter exprs",
256 }
257 );
258
259 debug!(
260 "expire_lower_bound: {:?}, window_size: {:?}",
261 expire_lower_bound.map(|t| t.to_iso8601_string()),
262 window_size
263 );
264 self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
265
266 if self.windows.len() > self.max_filter_num_per_query {
267 let first_time_window = self.windows.first_key_value();
268 let last_time_window = self.windows.last_key_value();
269
270 if let Some(task_ctx) = task_ctx {
271 warn!(
272 "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
273 task_ctx.config.flow_id,
274 self.windows.len(),
275 self.max_filter_num_per_query,
276 task_ctx.config.time_window_expr,
277 task_ctx.config.expire_after,
278 first_time_window,
279 last_time_window,
280 task_ctx.config.query
281 );
282 } else {
283 warn!("Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}",
284 flow_id,
285 self.windows.len(),
286 self.max_filter_num_per_query,
287 first_time_window,
288 last_time_window
289 )
290 }
291 }
292
293 let max_time_range = window_size * window_cnt as i32;
295
296 let mut to_be_query = BTreeMap::new();
297 let mut new_windows = self.windows.clone();
298 let mut cur_time_range = chrono::Duration::zero();
299 for (idx, (start, end)) in self.windows.iter().enumerate() {
300 let first_end = start
301 .add_duration(window_size.to_std().unwrap())
302 .context(TimeSnafu)?;
303 let end = end.unwrap_or(first_end);
304
305 if cur_time_range >= max_time_range {
307 break;
308 }
309
310 if idx >= window_cnt {
312 break;
313 }
314
315 let Some(x) = end.sub(start) else {
316 continue;
317 };
318 if cur_time_range + x <= max_time_range {
319 to_be_query.insert(*start, Some(end));
320 new_windows.remove(start);
321 cur_time_range += x;
322 } else {
323 let surplus = max_time_range - cur_time_range;
326 if surplus.num_seconds() <= window_size.num_seconds() {
327 break;
329 }
330 let times = surplus.num_seconds() / window_size.num_seconds();
331
332 let split_offset = window_size * times as i32;
333 let split_at = start
334 .add_duration(split_offset.to_std().unwrap())
335 .context(TimeSnafu)?;
336 to_be_query.insert(*start, Some(split_at));
337
338 new_windows.remove(start);
340 new_windows.insert(split_at, Some(end));
341 cur_time_range += split_offset;
342 break;
343 }
344 }
345
346 self.windows = new_windows;
347
348 METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
349 .with_label_values(&[flow_id.to_string().as_str()])
350 .observe(to_be_query.len() as f64);
351
352 let full_time_range = to_be_query
353 .iter()
354 .fold(chrono::Duration::zero(), |acc, (start, end)| {
355 if let Some(end) = end {
356 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
357 } else {
358 acc + window_size
359 }
360 })
361 .num_seconds() as f64;
362 METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE
363 .with_label_values(&[flow_id.to_string().as_str()])
364 .observe(full_time_range);
365
366 let stalled_time_range =
367 self.windows
368 .iter()
369 .fold(chrono::Duration::zero(), |acc, (start, end)| {
370 if let Some(end) = end {
371 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
372 } else {
373 acc + window_size
374 }
375 });
376
377 METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE
378 .with_label_values(&[flow_id.to_string().as_str()])
379 .observe(stalled_time_range.num_seconds() as f64);
380
381 let std_window_size = window_size.to_std().map_err(|e| {
382 InternalSnafu {
383 reason: e.to_string(),
384 }
385 .build()
386 })?;
387
388 let mut expr_lst = vec![];
389 let mut time_ranges = vec![];
390 for (start, end) in to_be_query.into_iter() {
391 let (start, end) = if let Some(ctx) = task_ctx {
393 let Some(time_window_expr) = &ctx.config.time_window_expr else {
394 UnexpectedSnafu {
395 reason: "time_window_expr is not set",
396 }
397 .fail()?
398 };
399 self.align_time_window(start, end, time_window_expr)?
400 } else {
401 (start, end)
402 };
403 let end = end.unwrap_or(start.add_duration(std_window_size).context(TimeSnafu)?);
404 time_ranges.push((start, end));
405
406 debug!(
407 "Time window start: {:?}, end: {:?}",
408 start.to_iso8601_string(),
409 end.to_iso8601_string()
410 );
411
412 use datafusion_expr::{col, lit};
413 let lower = to_df_literal(start)?;
414 let upper = to_df_literal(end)?;
415 let expr = col(col_name)
416 .gt_eq(lit(lower))
417 .and(col(col_name).lt(lit(upper)));
418 expr_lst.push(expr);
419 }
420 let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
421 let ret = expr.map(|expr| FilterExprInfo {
422 expr,
423 col_name: col_name.to_string(),
424 time_ranges,
425 window_size,
426 });
427 Ok(ret)
428 }
429
430 fn align_time_window(
431 &self,
432 start: Timestamp,
433 end: Option<Timestamp>,
434 time_window_expr: &TimeWindowExpr,
435 ) -> Result<(Timestamp, Option<Timestamp>), Error> {
436 let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
437 reason: format!(
438 "Failed to align start time {:?} with time window expr {:?}",
439 start, time_window_expr
440 ),
441 })?;
442 let align_end = end
443 .and_then(|end| {
444 time_window_expr
445 .eval(end)
446 .map(|r| if r.0 == Some(end) { r.0 } else { r.1 })
448 .transpose()
449 })
450 .transpose()?;
451 Ok((align_start, align_end))
452 }
453
454 pub fn merge_dirty_time_windows(
458 &mut self,
459 window_size: chrono::Duration,
460 expire_lower_bound: Option<Timestamp>,
461 ) -> Result<(), Error> {
462 if self.windows.is_empty() {
463 return Ok(());
464 }
465
466 let mut new_windows = BTreeMap::new();
467
468 let std_window_size = window_size.to_std().map_err(|e| {
469 InternalSnafu {
470 reason: e.to_string(),
471 }
472 .build()
473 })?;
474
475 let mut prev_tw = None;
477 for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
478 if let Some(expire_lower_bound) = expire_lower_bound {
480 if lower_bound < expire_lower_bound {
481 continue;
482 }
483 }
484
485 let Some(prev_tw) = &mut prev_tw else {
486 prev_tw = Some((lower_bound, upper_bound));
487 continue;
488 };
489
490 let prev_upper = prev_tw
493 .1
494 .unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
495 prev_tw.1 = Some(prev_upper);
496
497 let cur_upper = upper_bound.unwrap_or(
498 lower_bound
499 .add_duration(std_window_size)
500 .context(TimeSnafu)?,
501 );
502
503 if lower_bound
504 .sub(&prev_upper)
505 .map(|dist| dist <= window_size * self.time_window_merge_threshold as i32)
506 .unwrap_or(false)
507 {
508 prev_tw.1 = Some(cur_upper);
509 } else {
510 new_windows.insert(prev_tw.0, prev_tw.1);
511 *prev_tw = (lower_bound, Some(cur_upper));
512 }
513 }
514
515 if let Some(prev_tw) = prev_tw {
516 new_windows.insert(prev_tw.0, prev_tw.1);
517 }
518
519 self.windows = new_windows;
520
521 Ok(())
522 }
523}
524
525fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
526 let value = Value::from(value);
527 let value = value
528 .try_to_scalar_value(&value.data_type())
529 .with_context(|_| DatatypesSnafu {
530 extra: format!("Failed to convert to scalar value: {}", value),
531 })?;
532 Ok(value)
533}
534
535#[derive(Debug, Clone)]
536enum ExecState {
537 Idle,
538 Executing,
539}
540
541#[derive(Debug, Clone)]
543pub struct FilterExprInfo {
544 pub expr: datafusion_expr::Expr,
545 pub col_name: String,
546 pub time_ranges: Vec<(Timestamp, Timestamp)>,
547 pub window_size: chrono::Duration,
548}
549
550impl FilterExprInfo {
551 pub fn total_window_length(&self) -> chrono::Duration {
552 self.time_ranges
553 .iter()
554 .fold(chrono::Duration::zero(), |acc, (start, end)| {
555 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
556 })
557 }
558}
559
560#[cfg(test)]
561mod test {
562 use pretty_assertions::assert_eq;
563 use session::context::QueryContext;
564
565 use super::*;
566 use crate::batching_mode::time_window::find_time_window_expr;
567 use crate::batching_mode::utils::sql_to_df_plan;
568 use crate::test_utils::create_test_query_engine;
569
570 #[test]
571 fn test_merge_dirty_time_windows() {
572 let merge_dist = DirtyTimeWindows::default().time_window_merge_threshold;
573 let testcases = vec![
574 (
576 vec![
577 Timestamp::new_second(0),
578 Timestamp::new_second((1 + merge_dist as i64) * 5 * 60),
579 ],
580 (chrono::Duration::seconds(5 * 60), None),
581 BTreeMap::from([(
582 Timestamp::new_second(0),
583 Some(Timestamp::new_second(
584 (2 + merge_dist as i64) * 5 * 60,
585 )),
586 )]),
587 Some(
588 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:25:00' AS TIMESTAMP)))",
589 )
590 ),
591 (
593 vec![
594 Timestamp::new_second(0),
595 Timestamp::new_second((2 + merge_dist as i64) * 5 * 60),
596 ],
597 (chrono::Duration::seconds(5 * 60), None),
598 BTreeMap::from([
599 (
600 Timestamp::new_second(0),
601 Some(Timestamp::new_second(5 * 60)),
602 ),
603 (
604 Timestamp::new_second((2 + merge_dist as i64) * 5 * 60),
605 Some(Timestamp::new_second(
606 (3 + merge_dist as i64) * 5 * 60,
607 )),
608 ),
609 ]),
610 Some(
611 "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:05:00' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:25:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:30:00' AS TIMESTAMP))))",
612 )
613 ),
614 (
616 vec![
617 Timestamp::new_second(0),
618 Timestamp::new_second((merge_dist as i64) * 5 * 60),
619 ],
620 (chrono::Duration::seconds(5 * 60), None),
621 BTreeMap::from([(
622 Timestamp::new_second(0),
623 Some(Timestamp::new_second(
624 (1 + merge_dist as i64) * 5 * 60,
625 )),
626 )]),
627 Some(
628 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:20:00' AS TIMESTAMP)))",
629 )
630 ),
631 (
633 vec![
634 Timestamp::new_second(0),
635 Timestamp::new_second((merge_dist as i64) * 3),
636 Timestamp::new_second((merge_dist as i64) * 3 * 2),
637 ],
638 (chrono::Duration::seconds(3), None),
639 BTreeMap::from([(
640 Timestamp::new_second(0),
641 Some(Timestamp::new_second(
642 (merge_dist as i64) * 7
643 )),
644 )]),
645 Some(
646 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
647 )
648 ),
649 (
651 Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once(
652 Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
653 ))),
654 (chrono::Duration::seconds(3), None),
655 BTreeMap::from([
656 (
657 Timestamp::new_second(0),
658 Some(Timestamp::new_second(
659 60
660 )),
661 ),
662 (
663 Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
664 Some(Timestamp::new_second(
665 60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3
666 )),
667 )]),
668 Some(
669 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
670 )
671 ),
672 (
674 Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))),
675 (chrono::Duration::seconds(3), None),
676 BTreeMap::from([
677 (
678 Timestamp::new_second(0),
679 Some(Timestamp::new_second(
680 40 * 3
681 )),
682 )]),
683 Some(
684 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
685 )
686 ),
687 (
689 Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))),
690 (chrono::Duration::seconds(3), None),
691 BTreeMap::from([
692 (
693 Timestamp::new_second(0),
694 Some(Timestamp::new_second(
695 3
696 )),
697 ),(
698 Timestamp::new_second(20),
699 Some(Timestamp::new_second(
700 140
701 )),
702 )]),
703 Some(
704 "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
705 )
706 ),
707 (
709 vec![
710 Timestamp::new_second(0),
711 Timestamp::new_second((merge_dist as i64) * 5 * 60),
712 ],
713 (
714 chrono::Duration::seconds(5 * 60),
715 Some(Timestamp::new_second(
716 (merge_dist as i64) * 6 * 60,
717 )),
718 ),
719 BTreeMap::from([]),
720 None
721 ),
722 ];
723 for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
726 testcases
727 {
728 let mut dirty = DirtyTimeWindows::default();
729 dirty.add_lower_bounds(lower_bounds.into_iter());
730 dirty
731 .merge_dirty_time_windows(window_size, expire_lower_bound)
732 .unwrap();
733 assert_eq!(expected, dirty.windows);
734 let filter_expr = dirty
735 .gen_filter_exprs(
736 "ts",
737 expire_lower_bound,
738 window_size,
739 dirty.max_filter_num_per_query,
740 0,
741 None,
742 )
743 .unwrap()
744 .map(|e| e.expr);
745
746 let unparser = datafusion::sql::unparser::Unparser::default();
747 let to_sql = filter_expr
748 .as_ref()
749 .map(|e| unparser.expr_to_sql(e).unwrap().to_string());
750 assert_eq!(expected_filter_expr, to_sql.as_deref());
751 }
752 }
753
754 #[tokio::test]
755 async fn test_align_time_window() {
756 type TimeWindow = (Timestamp, Option<Timestamp>);
757 struct TestCase {
758 sql: String,
759 aligns: Vec<(TimeWindow, TimeWindow)>,
760 }
761 let testcases: Vec<TestCase> = vec![TestCase{
762 sql: "SELECT date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window;".to_string(),
763 aligns: vec![
764 ((Timestamp::new_second(3), None), (Timestamp::new_second(0), None)),
765 ((Timestamp::new_second(8), None), (Timestamp::new_second(5), None)),
766 ((Timestamp::new_second(8), Some(Timestamp::new_second(10))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
767 ((Timestamp::new_second(8), Some(Timestamp::new_second(9))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
768 ],
769 }];
770
771 let query_engine = create_test_query_engine();
772 let ctx = QueryContext::arc();
773 for TestCase { sql, aligns } in testcases {
774 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, true)
775 .await
776 .unwrap();
777
778 let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
779 &plan,
780 query_engine.engine_state().catalog_manager().clone(),
781 ctx.clone(),
782 )
783 .await
784 .unwrap();
785
786 let time_window_expr = time_window_expr
787 .map(|expr| {
788 TimeWindowExpr::from_expr(
789 &expr,
790 &column_name,
791 &df_schema,
792 &query_engine.engine_state().session_state(),
793 )
794 })
795 .transpose()
796 .unwrap()
797 .unwrap();
798
799 let dirty = DirtyTimeWindows::default();
800 for (before_align, expected_after_align) in aligns {
801 let after_align = dirty
802 .align_time_window(before_align.0, before_align.1, &time_window_expr)
803 .unwrap();
804 assert_eq!(expected_after_align, after_align);
805 }
806 }
807 }
808}