1use std::collections::BTreeMap;
18use std::time::Duration;
19
20use common_telemetry::debug;
21use common_telemetry::tracing::warn;
22use common_time::Timestamp;
23use datatypes::value::Value;
24use session::context::QueryContextRef;
25use snafu::{ensure, OptionExt, ResultExt};
26use tokio::sync::oneshot;
27use tokio::time::Instant;
28
29use crate::batching_mode::task::BatchingTask;
30use crate::batching_mode::time_window::TimeWindowExpr;
31use crate::batching_mode::MIN_REFRESH_DURATION;
32use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
33use crate::metrics::{
34 METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE,
35 METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE,
36};
37use crate::{Error, FlowId};
38
39#[derive(Debug)]
41pub struct TaskState {
42 pub(crate) query_ctx: QueryContextRef,
44 last_update_time: Instant,
46 last_query_duration: Duration,
48 pub(crate) dirty_time_windows: DirtyTimeWindows,
51 exec_state: ExecState,
52 pub(crate) shutdown_rx: oneshot::Receiver<()>,
54 pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
56}
57impl TaskState {
58 pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
59 Self {
60 query_ctx,
61 last_update_time: Instant::now(),
62 last_query_duration: Duration::from_secs(0),
63 dirty_time_windows: Default::default(),
64 exec_state: ExecState::Idle,
65 shutdown_rx,
66 task_handle: None,
67 }
68 }
69
70 pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) {
73 self.exec_state = ExecState::Idle;
74 self.last_query_duration = elapsed;
75 self.last_update_time = Instant::now();
76 }
77
78 pub fn get_next_start_query_time(
89 &self,
90 flow_id: FlowId,
91 time_window_size: &Option<Duration>,
92 max_timeout: Option<Duration>,
93 ) -> Instant {
94 let lower = time_window_size.unwrap_or(MIN_REFRESH_DURATION);
96 let next_duration = self.last_query_duration.max(lower);
97 let next_duration = if let Some(max_timeout) = max_timeout {
98 next_duration.min(max_timeout)
99 } else {
100 next_duration
101 };
102
103 let cur_dirty_window_size = self.dirty_time_windows.window_size();
104 let max_query_update_range = (*time_window_size)
106 .unwrap_or_default()
107 .mul_f64(DirtyTimeWindows::MAX_FILTER_NUM as f64);
108 if cur_dirty_window_size < max_query_update_range {
111 self.last_update_time + next_duration
112 } else {
113 debug!(
116 "Flow id = {}, still have too many {} dirty time window({:?}), execute immediately",
117 flow_id,
118 self.dirty_time_windows.windows.len(),
119 self.dirty_time_windows.windows
120 );
121 Instant::now()
122 }
123 }
124}
125
126#[derive(Debug, Clone, Default)]
129pub struct DirtyTimeWindows {
130 windows: BTreeMap<Timestamp, Option<Timestamp>>,
133}
134
135impl DirtyTimeWindows {
136 pub const MERGE_DIST: i32 = 3;
140
141 pub const MAX_FILTER_NUM: usize = 20;
143
144 pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
150 for lower_bound in lower_bounds {
151 let entry = self.windows.entry(lower_bound);
152 entry.or_insert(None);
153 }
154 }
155
156 pub fn window_size(&self) -> Duration {
157 let mut ret = Duration::from_secs(0);
158 for (start, end) in &self.windows {
159 if let Some(end) = end {
160 if let Some(duration) = end.sub(start) {
161 ret += duration.to_std().unwrap_or_default();
162 }
163 }
164 }
165 ret
166 }
167
168 pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
169 self.windows.insert(start, end);
170 }
171
172 pub fn clean(&mut self) {
174 self.windows.clear();
175 }
176
177 pub fn len(&self) -> usize {
179 self.windows.len()
180 }
181
182 pub fn effective_count(&self, window_size: &Duration) -> usize {
185 if self.windows.is_empty() {
186 return 0;
187 }
188 let window_size =
189 chrono::Duration::from_std(*window_size).unwrap_or(chrono::Duration::zero());
190 let total_window_time_range =
191 self.windows
192 .iter()
193 .fold(chrono::Duration::zero(), |acc, (start, end)| {
194 if let Some(end) = end {
195 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
196 } else {
197 acc + window_size
198 }
199 });
200
201 if window_size.num_seconds() == 0 {
203 0
204 } else {
205 (total_window_time_range.num_seconds() / window_size.num_seconds()) as usize
206 }
207 }
208
209 pub fn gen_filter_exprs(
215 &mut self,
216 col_name: &str,
217 expire_lower_bound: Option<Timestamp>,
218 window_size: chrono::Duration,
219 window_cnt: usize,
220 flow_id: FlowId,
221 task_ctx: Option<&BatchingTask>,
222 ) -> Result<Option<datafusion_expr::Expr>, Error> {
223 ensure!(
224 window_size.num_seconds() > 0,
225 UnexpectedSnafu {
226 reason: "window_size is zero, can't generate filter exprs",
227 }
228 );
229
230 debug!(
231 "expire_lower_bound: {:?}, window_size: {:?}",
232 expire_lower_bound.map(|t| t.to_iso8601_string()),
233 window_size
234 );
235 self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
236
237 if self.windows.len() > Self::MAX_FILTER_NUM {
238 let first_time_window = self.windows.first_key_value();
239 let last_time_window = self.windows.last_key_value();
240
241 if let Some(task_ctx) = task_ctx {
242 warn!(
243 "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
244 task_ctx.config.flow_id,
245 self.windows.len(),
246 Self::MAX_FILTER_NUM,
247 task_ctx.config.time_window_expr,
248 task_ctx.config.expire_after,
249 first_time_window,
250 last_time_window,
251 task_ctx.config.query
252 );
253 } else {
254 warn!("Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}",
255 flow_id,
256 self.windows.len(),
257 Self::MAX_FILTER_NUM,
258 first_time_window,
259 last_time_window
260 )
261 }
262 }
263
264 let max_time_range = window_size * window_cnt as i32;
266
267 let mut to_be_query = BTreeMap::new();
268 let mut new_windows = self.windows.clone();
269 let mut cur_time_range = chrono::Duration::zero();
270 for (idx, (start, end)) in self.windows.iter().enumerate() {
271 let first_end = start
272 .add_duration(window_size.to_std().unwrap())
273 .context(TimeSnafu)?;
274 let end = end.unwrap_or(first_end);
275
276 if cur_time_range >= max_time_range {
278 break;
279 }
280
281 if idx >= window_cnt {
283 break;
284 }
285
286 let Some(x) = end.sub(start) else {
287 continue;
288 };
289 if cur_time_range + x <= max_time_range {
290 to_be_query.insert(*start, Some(end));
291 new_windows.remove(start);
292 cur_time_range += x;
293 } else {
294 let surplus = max_time_range - cur_time_range;
297 if surplus.num_seconds() <= window_size.num_seconds() {
298 break;
300 }
301 let times = surplus.num_seconds() / window_size.num_seconds();
302
303 let split_offset = window_size * times as i32;
304 let split_at = start
305 .add_duration(split_offset.to_std().unwrap())
306 .context(TimeSnafu)?;
307 to_be_query.insert(*start, Some(split_at));
308
309 new_windows.remove(start);
311 new_windows.insert(split_at, Some(end));
312 cur_time_range += split_offset;
313 break;
314 }
315 }
316
317 self.windows = new_windows;
318
319 METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
320 .with_label_values(&[flow_id.to_string().as_str()])
321 .observe(to_be_query.len() as f64);
322
323 let full_time_range = to_be_query
324 .iter()
325 .fold(chrono::Duration::zero(), |acc, (start, end)| {
326 if let Some(end) = end {
327 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
328 } else {
329 acc + window_size
330 }
331 })
332 .num_seconds() as f64;
333 METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE
334 .with_label_values(&[flow_id.to_string().as_str()])
335 .observe(full_time_range);
336
337 let stalled_time_range =
338 self.windows
339 .iter()
340 .fold(chrono::Duration::zero(), |acc, (start, end)| {
341 if let Some(end) = end {
342 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
343 } else {
344 acc + window_size
345 }
346 });
347
348 METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE
349 .with_label_values(&[flow_id.to_string().as_str()])
350 .observe(stalled_time_range.num_seconds() as f64);
351
352 let mut expr_lst = vec![];
353 for (start, end) in to_be_query.into_iter() {
354 let (start, end) = if let Some(ctx) = task_ctx {
356 let Some(time_window_expr) = &ctx.config.time_window_expr else {
357 UnexpectedSnafu {
358 reason: "time_window_expr is not set",
359 }
360 .fail()?
361 };
362 self.align_time_window(start, end, time_window_expr)?
363 } else {
364 (start, end)
365 };
366 debug!(
367 "Time window start: {:?}, end: {:?}",
368 start.to_iso8601_string(),
369 end.map(|t| t.to_iso8601_string())
370 );
371
372 use datafusion_expr::{col, lit};
373 let lower = to_df_literal(start)?;
374 let upper = end.map(to_df_literal).transpose()?;
375 let expr = if let Some(upper) = upper {
376 col(col_name)
377 .gt_eq(lit(lower))
378 .and(col(col_name).lt(lit(upper)))
379 } else {
380 col(col_name).gt_eq(lit(lower))
381 };
382 expr_lst.push(expr);
383 }
384 let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
385 Ok(expr)
386 }
387
388 fn align_time_window(
389 &self,
390 start: Timestamp,
391 end: Option<Timestamp>,
392 time_window_expr: &TimeWindowExpr,
393 ) -> Result<(Timestamp, Option<Timestamp>), Error> {
394 let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
395 reason: format!(
396 "Failed to align start time {:?} with time window expr {:?}",
397 start, time_window_expr
398 ),
399 })?;
400 let align_end = end
401 .and_then(|end| {
402 time_window_expr
403 .eval(end)
404 .map(|r| if r.0 == Some(end) { r.0 } else { r.1 })
406 .transpose()
407 })
408 .transpose()?;
409 Ok((align_start, align_end))
410 }
411
412 pub fn merge_dirty_time_windows(
416 &mut self,
417 window_size: chrono::Duration,
418 expire_lower_bound: Option<Timestamp>,
419 ) -> Result<(), Error> {
420 if self.windows.is_empty() {
421 return Ok(());
422 }
423
424 let mut new_windows = BTreeMap::new();
425
426 let std_window_size = window_size.to_std().map_err(|e| {
427 InternalSnafu {
428 reason: e.to_string(),
429 }
430 .build()
431 })?;
432
433 let mut prev_tw = None;
435 for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
436 if let Some(expire_lower_bound) = expire_lower_bound {
438 if lower_bound < expire_lower_bound {
439 continue;
440 }
441 }
442
443 let Some(prev_tw) = &mut prev_tw else {
444 prev_tw = Some((lower_bound, upper_bound));
445 continue;
446 };
447
448 let prev_upper = prev_tw
451 .1
452 .unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
453 prev_tw.1 = Some(prev_upper);
454
455 let cur_upper = upper_bound.unwrap_or(
456 lower_bound
457 .add_duration(std_window_size)
458 .context(TimeSnafu)?,
459 );
460
461 if lower_bound
462 .sub(&prev_upper)
463 .map(|dist| dist <= window_size * Self::MERGE_DIST)
464 .unwrap_or(false)
465 {
466 prev_tw.1 = Some(cur_upper);
467 } else {
468 new_windows.insert(prev_tw.0, prev_tw.1);
469 *prev_tw = (lower_bound, Some(cur_upper));
470 }
471 }
472
473 if let Some(prev_tw) = prev_tw {
474 new_windows.insert(prev_tw.0, prev_tw.1);
475 }
476
477 self.windows = new_windows;
478
479 Ok(())
480 }
481}
482
483fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
484 let value = Value::from(value);
485 let value = value
486 .try_to_scalar_value(&value.data_type())
487 .with_context(|_| DatatypesSnafu {
488 extra: format!("Failed to convert to scalar value: {}", value),
489 })?;
490 Ok(value)
491}
492
493#[derive(Debug, Clone)]
494enum ExecState {
495 Idle,
496 Executing,
497}
498
499#[cfg(test)]
500mod test {
501 use pretty_assertions::assert_eq;
502 use session::context::QueryContext;
503
504 use super::*;
505 use crate::batching_mode::time_window::find_time_window_expr;
506 use crate::batching_mode::utils::sql_to_df_plan;
507 use crate::test_utils::create_test_query_engine;
508
509 #[test]
510 fn test_merge_dirty_time_windows() {
511 let testcases = vec![
512 (
514 vec![
515 Timestamp::new_second(0),
516 Timestamp::new_second((1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
517 ],
518 (chrono::Duration::seconds(5 * 60), None),
519 BTreeMap::from([(
520 Timestamp::new_second(0),
521 Some(Timestamp::new_second(
522 (2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
523 )),
524 )]),
525 Some(
526 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:25:00' AS TIMESTAMP)))",
527 )
528 ),
529 (
531 vec![
532 Timestamp::new_second(0),
533 Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
534 ],
535 (chrono::Duration::seconds(5 * 60), None),
536 BTreeMap::from([
537 (
538 Timestamp::new_second(0),
539 Some(Timestamp::new_second(5 * 60)),
540 ),
541 (
542 Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
543 Some(Timestamp::new_second(
544 (3 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
545 )),
546 ),
547 ]),
548 Some(
549 "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:05:00' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:25:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:30:00' AS TIMESTAMP))))",
550 )
551 ),
552 (
554 vec![
555 Timestamp::new_second(0),
556 Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
557 ],
558 (chrono::Duration::seconds(5 * 60), None),
559 BTreeMap::from([(
560 Timestamp::new_second(0),
561 Some(Timestamp::new_second(
562 (1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
563 )),
564 )]),
565 Some(
566 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:20:00' AS TIMESTAMP)))",
567 )
568 ),
569 (
571 vec![
572 Timestamp::new_second(0),
573 Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3),
574 Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3 * 2),
575 ],
576 (chrono::Duration::seconds(3), None),
577 BTreeMap::from([(
578 Timestamp::new_second(0),
579 Some(Timestamp::new_second(
580 (DirtyTimeWindows::MERGE_DIST as i64) * 7
581 )),
582 )]),
583 Some(
584 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
585 )
586 ),
587 (
589 Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once(
590 Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
591 ))),
592 (chrono::Duration::seconds(3), None),
593 BTreeMap::from([
594 (
595 Timestamp::new_second(0),
596 Some(Timestamp::new_second(
597 60
598 )),
599 ),
600 (
601 Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
602 Some(Timestamp::new_second(
603 60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3
604 )),
605 )]),
606 Some(
607 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
608 )
609 ),
610 (
612 Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))),
613 (chrono::Duration::seconds(3), None),
614 BTreeMap::from([
615 (
616 Timestamp::new_second(0),
617 Some(Timestamp::new_second(
618 40 * 3
619 )),
620 )]),
621 Some(
622 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
623 )
624 ),
625 (
627 Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))),
628 (chrono::Duration::seconds(3), None),
629 BTreeMap::from([
630 (
631 Timestamp::new_second(0),
632 Some(Timestamp::new_second(
633 3
634 )),
635 ),(
636 Timestamp::new_second(20),
637 Some(Timestamp::new_second(
638 140
639 )),
640 )]),
641 Some(
642 "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
643 )
644 ),
645 (
647 vec![
648 Timestamp::new_second(0),
649 Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
650 ],
651 (
652 chrono::Duration::seconds(5 * 60),
653 Some(Timestamp::new_second(
654 (DirtyTimeWindows::MERGE_DIST as i64) * 6 * 60,
655 )),
656 ),
657 BTreeMap::from([]),
658 None
659 ),
660 ];
661 for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
664 testcases
665 {
666 let mut dirty = DirtyTimeWindows::default();
667 dirty.add_lower_bounds(lower_bounds.into_iter());
668 dirty
669 .merge_dirty_time_windows(window_size, expire_lower_bound)
670 .unwrap();
671 assert_eq!(expected, dirty.windows);
672 let filter_expr = dirty
673 .gen_filter_exprs(
674 "ts",
675 expire_lower_bound,
676 window_size,
677 DirtyTimeWindows::MAX_FILTER_NUM,
678 0,
679 None,
680 )
681 .unwrap();
682
683 let unparser = datafusion::sql::unparser::Unparser::default();
684 let to_sql = filter_expr
685 .as_ref()
686 .map(|e| unparser.expr_to_sql(e).unwrap().to_string());
687 assert_eq!(expected_filter_expr, to_sql.as_deref());
688 }
689 }
690
691 #[tokio::test]
692 async fn test_align_time_window() {
693 type TimeWindow = (Timestamp, Option<Timestamp>);
694 struct TestCase {
695 sql: String,
696 aligns: Vec<(TimeWindow, TimeWindow)>,
697 }
698 let testcases: Vec<TestCase> = vec![TestCase{
699 sql: "SELECT date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window;".to_string(),
700 aligns: vec![
701 ((Timestamp::new_second(3), None), (Timestamp::new_second(0), None)),
702 ((Timestamp::new_second(8), None), (Timestamp::new_second(5), None)),
703 ((Timestamp::new_second(8), Some(Timestamp::new_second(10))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
704 ((Timestamp::new_second(8), Some(Timestamp::new_second(9))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
705 ],
706 }];
707
708 let query_engine = create_test_query_engine();
709 let ctx = QueryContext::arc();
710 for TestCase { sql, aligns } in testcases {
711 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, true)
712 .await
713 .unwrap();
714
715 let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
716 &plan,
717 query_engine.engine_state().catalog_manager().clone(),
718 ctx.clone(),
719 )
720 .await
721 .unwrap();
722
723 let time_window_expr = time_window_expr
724 .map(|expr| {
725 TimeWindowExpr::from_expr(
726 &expr,
727 &column_name,
728 &df_schema,
729 &query_engine.engine_state().session_state(),
730 )
731 })
732 .transpose()
733 .unwrap()
734 .unwrap();
735
736 let dirty = DirtyTimeWindows::default();
737 for (before_align, expected_after_align) in aligns {
738 let after_align = dirty
739 .align_time_window(before_align.0, before_align.1, &time_window_expr)
740 .unwrap();
741 assert_eq!(expected_after_align, after_align);
742 }
743 }
744 }
745}