1use std::collections::BTreeMap;
18use std::time::Duration;
19
20use common_telemetry::debug;
21use common_telemetry::tracing::warn;
22use common_time::Timestamp;
23use datatypes::value::Value;
24use session::context::QueryContextRef;
25use snafu::{OptionExt, ResultExt};
26use tokio::sync::oneshot;
27use tokio::time::Instant;
28
29use crate::batching_mode::task::BatchingTask;
30use crate::batching_mode::time_window::TimeWindowExpr;
31use crate::batching_mode::MIN_REFRESH_DURATION;
32use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
33use crate::metrics::{
34 METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
35};
36use crate::{Error, FlowId};
37
38#[derive(Debug)]
40pub struct TaskState {
41 pub(crate) query_ctx: QueryContextRef,
43 last_update_time: Instant,
45 last_query_duration: Duration,
47 pub(crate) dirty_time_windows: DirtyTimeWindows,
50 exec_state: ExecState,
51 pub(crate) shutdown_rx: oneshot::Receiver<()>,
53 pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
55}
56impl TaskState {
57 pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
58 Self {
59 query_ctx,
60 last_update_time: Instant::now(),
61 last_query_duration: Duration::from_secs(0),
62 dirty_time_windows: Default::default(),
63 exec_state: ExecState::Idle,
64 shutdown_rx,
65 task_handle: None,
66 }
67 }
68
69 pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) {
72 self.exec_state = ExecState::Idle;
73 self.last_query_duration = elapsed;
74 self.last_update_time = Instant::now();
75 }
76
77 pub fn get_next_start_query_time(
88 &self,
89 flow_id: FlowId,
90 time_window_size: &Option<Duration>,
91 max_timeout: Option<Duration>,
92 ) -> Instant {
93 let lower = time_window_size.unwrap_or(MIN_REFRESH_DURATION);
95 let next_duration = self.last_query_duration.max(lower);
96 let next_duration = if let Some(max_timeout) = max_timeout {
97 next_duration.min(max_timeout)
98 } else {
99 next_duration
100 };
101
102 let cur_dirty_window_size = self.dirty_time_windows.window_size();
103 let max_query_update_range = (*time_window_size)
105 .unwrap_or_default()
106 .mul_f64(DirtyTimeWindows::MAX_FILTER_NUM as f64);
107 if cur_dirty_window_size < max_query_update_range {
110 self.last_update_time + next_duration
111 } else {
112 debug!(
115 "Flow id = {}, still have too many {} dirty time window({:?}), execute immediately",
116 flow_id,
117 self.dirty_time_windows.windows.len(),
118 self.dirty_time_windows.windows
119 );
120 Instant::now()
121 }
122 }
123}
124
125#[derive(Debug, Clone, Default)]
128pub struct DirtyTimeWindows {
129 windows: BTreeMap<Timestamp, Option<Timestamp>>,
132}
133
134impl DirtyTimeWindows {
135 pub const MERGE_DIST: i32 = 3;
139
140 pub const MAX_FILTER_NUM: usize = 20;
142
143 pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
149 for lower_bound in lower_bounds {
150 let entry = self.windows.entry(lower_bound);
151 entry.or_insert(None);
152 }
153 }
154
155 pub fn window_size(&self) -> Duration {
156 let mut ret = Duration::from_secs(0);
157 for (start, end) in &self.windows {
158 if let Some(end) = end {
159 if let Some(duration) = end.sub(start) {
160 ret += duration.to_std().unwrap_or_default();
161 }
162 }
163 }
164 ret
165 }
166
167 pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
168 self.windows.insert(start, end);
169 }
170
171 pub fn clean(&mut self) {
173 self.windows.clear();
174 }
175
176 pub fn len(&self) -> usize {
178 self.windows.len()
179 }
180
181 pub fn gen_filter_exprs(
187 &mut self,
188 col_name: &str,
189 expire_lower_bound: Option<Timestamp>,
190 window_size: chrono::Duration,
191 window_cnt: usize,
192 flow_id: FlowId,
193 task_ctx: Option<&BatchingTask>,
194 ) -> Result<Option<datafusion_expr::Expr>, Error> {
195 debug!(
196 "expire_lower_bound: {:?}, window_size: {:?}",
197 expire_lower_bound.map(|t| t.to_iso8601_string()),
198 window_size
199 );
200 self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
201
202 if self.windows.len() > Self::MAX_FILTER_NUM {
203 let first_time_window = self.windows.first_key_value();
204 let last_time_window = self.windows.last_key_value();
205
206 if let Some(task_ctx) = task_ctx {
207 warn!(
208 "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
209 task_ctx.config.flow_id,
210 self.windows.len(),
211 Self::MAX_FILTER_NUM,
212 task_ctx.config.time_window_expr,
213 task_ctx.config.expire_after,
214 first_time_window,
215 last_time_window,
216 task_ctx.config.query
217 );
218 } else {
219 warn!("Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}",
220 flow_id,
221 self.windows.len(),
222 Self::MAX_FILTER_NUM,
223 first_time_window,
224 last_time_window
225 )
226 }
227 }
228
229 let max_time_range = window_size * window_cnt as i32;
231 let nth = {
232 let mut cur_time_range = chrono::Duration::zero();
233 let mut nth_key = None;
234 for (idx, (start, end)) in self.windows.iter().enumerate() {
235 if cur_time_range > max_time_range {
237 nth_key = Some(*start);
238 break;
239 }
240
241 if idx >= window_cnt {
243 nth_key = Some(*start);
244 break;
245 }
246
247 if let Some(end) = end {
248 if let Some(x) = end.sub(start) {
249 cur_time_range += x;
250 }
251 }
252 }
253
254 nth_key
255 };
256 let first_nth = {
257 if let Some(nth) = nth {
258 let mut after = self.windows.split_off(&nth);
259 std::mem::swap(&mut self.windows, &mut after);
260
261 after
262 } else {
263 std::mem::take(&mut self.windows)
264 }
265 };
266
267 METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
268 .with_label_values(&[flow_id.to_string().as_str()])
269 .observe(first_nth.len() as f64);
270
271 let full_time_range = first_nth
272 .iter()
273 .fold(chrono::Duration::zero(), |acc, (start, end)| {
274 if let Some(end) = end {
275 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
276 } else {
277 acc
278 }
279 })
280 .num_seconds() as f64;
281 METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
282 .with_label_values(&[flow_id.to_string().as_str()])
283 .observe(full_time_range);
284
285 let mut expr_lst = vec![];
286 for (start, end) in first_nth.into_iter() {
287 let (start, end) = if let Some(ctx) = task_ctx {
289 let Some(time_window_expr) = &ctx.config.time_window_expr else {
290 UnexpectedSnafu {
291 reason: "time_window_expr is not set",
292 }
293 .fail()?
294 };
295 self.align_time_window(start, end, time_window_expr)?
296 } else {
297 (start, end)
298 };
299 debug!(
300 "Time window start: {:?}, end: {:?}",
301 start.to_iso8601_string(),
302 end.map(|t| t.to_iso8601_string())
303 );
304
305 use datafusion_expr::{col, lit};
306 let lower = to_df_literal(start)?;
307 let upper = end.map(to_df_literal).transpose()?;
308 let expr = if let Some(upper) = upper {
309 col(col_name)
310 .gt_eq(lit(lower))
311 .and(col(col_name).lt(lit(upper)))
312 } else {
313 col(col_name).gt_eq(lit(lower))
314 };
315 expr_lst.push(expr);
316 }
317 let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
318 Ok(expr)
319 }
320
321 fn align_time_window(
322 &self,
323 start: Timestamp,
324 end: Option<Timestamp>,
325 time_window_expr: &TimeWindowExpr,
326 ) -> Result<(Timestamp, Option<Timestamp>), Error> {
327 let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
328 reason: format!(
329 "Failed to align start time {:?} with time window expr {:?}",
330 start, time_window_expr
331 ),
332 })?;
333 let align_end = end
334 .and_then(|end| {
335 time_window_expr
336 .eval(end)
337 .map(|r| if r.0 == Some(end) { r.0 } else { r.1 })
339 .transpose()
340 })
341 .transpose()?;
342 Ok((align_start, align_end))
343 }
344
345 pub fn merge_dirty_time_windows(
349 &mut self,
350 window_size: chrono::Duration,
351 expire_lower_bound: Option<Timestamp>,
352 ) -> Result<(), Error> {
353 if self.windows.is_empty() {
354 return Ok(());
355 }
356
357 let mut new_windows = BTreeMap::new();
358
359 let std_window_size = window_size.to_std().map_err(|e| {
360 InternalSnafu {
361 reason: e.to_string(),
362 }
363 .build()
364 })?;
365
366 let mut prev_tw = None;
368 for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
369 if let Some(expire_lower_bound) = expire_lower_bound {
371 if lower_bound < expire_lower_bound {
372 continue;
373 }
374 }
375
376 let Some(prev_tw) = &mut prev_tw else {
377 prev_tw = Some((lower_bound, upper_bound));
378 continue;
379 };
380
381 let prev_upper = prev_tw
384 .1
385 .unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
386 prev_tw.1 = Some(prev_upper);
387
388 let cur_upper = upper_bound.unwrap_or(
389 lower_bound
390 .add_duration(std_window_size)
391 .context(TimeSnafu)?,
392 );
393
394 if lower_bound
395 .sub(&prev_upper)
396 .map(|dist| dist <= window_size * Self::MERGE_DIST)
397 .unwrap_or(false)
398 {
399 prev_tw.1 = Some(cur_upper);
400 } else {
401 new_windows.insert(prev_tw.0, prev_tw.1);
402 *prev_tw = (lower_bound, Some(cur_upper));
403 }
404 }
405
406 if let Some(prev_tw) = prev_tw {
407 new_windows.insert(prev_tw.0, prev_tw.1);
408 }
409
410 self.windows = new_windows;
411
412 Ok(())
413 }
414}
415
416fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
417 let value = Value::from(value);
418 let value = value
419 .try_to_scalar_value(&value.data_type())
420 .with_context(|_| DatatypesSnafu {
421 extra: format!("Failed to convert to scalar value: {}", value),
422 })?;
423 Ok(value)
424}
425
426#[derive(Debug, Clone)]
427enum ExecState {
428 Idle,
429 Executing,
430}
431
432#[cfg(test)]
433mod test {
434 use pretty_assertions::assert_eq;
435 use session::context::QueryContext;
436
437 use super::*;
438 use crate::batching_mode::time_window::find_time_window_expr;
439 use crate::batching_mode::utils::sql_to_df_plan;
440 use crate::test_utils::create_test_query_engine;
441
442 #[test]
443 fn test_merge_dirty_time_windows() {
444 let testcases = vec![
445 (
447 vec![
448 Timestamp::new_second(0),
449 Timestamp::new_second((1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
450 ],
451 (chrono::Duration::seconds(5 * 60), None),
452 BTreeMap::from([(
453 Timestamp::new_second(0),
454 Some(Timestamp::new_second(
455 (2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
456 )),
457 )]),
458 Some(
459 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:25:00' AS TIMESTAMP)))",
460 )
461 ),
462 (
464 vec![
465 Timestamp::new_second(0),
466 Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
467 ],
468 (chrono::Duration::seconds(5 * 60), None),
469 BTreeMap::from([
470 (
471 Timestamp::new_second(0),
472 Some(Timestamp::new_second(5 * 60)),
473 ),
474 (
475 Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
476 Some(Timestamp::new_second(
477 (3 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
478 )),
479 ),
480 ]),
481 Some(
482 "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:05:00' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:25:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:30:00' AS TIMESTAMP))))",
483 )
484 ),
485 (
487 vec![
488 Timestamp::new_second(0),
489 Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
490 ],
491 (chrono::Duration::seconds(5 * 60), None),
492 BTreeMap::from([(
493 Timestamp::new_second(0),
494 Some(Timestamp::new_second(
495 (1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
496 )),
497 )]),
498 Some(
499 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:20:00' AS TIMESTAMP)))",
500 )
501 ),
502 (
504 vec![
505 Timestamp::new_second(0),
506 Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3),
507 Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3 * 2),
508 ],
509 (chrono::Duration::seconds(3), None),
510 BTreeMap::from([(
511 Timestamp::new_second(0),
512 Some(Timestamp::new_second(
513 (DirtyTimeWindows::MERGE_DIST as i64) * 7
514 )),
515 )]),
516 Some(
517 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
518 )
519 ),
520 (
522 vec![
523 Timestamp::new_second(0),
524 Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
525 ],
526 (
527 chrono::Duration::seconds(5 * 60),
528 Some(Timestamp::new_second(
529 (DirtyTimeWindows::MERGE_DIST as i64) * 6 * 60,
530 )),
531 ),
532 BTreeMap::from([]),
533 None
534 ),
535 ];
536 for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
537 testcases
538 {
539 let mut dirty = DirtyTimeWindows::default();
540 dirty.add_lower_bounds(lower_bounds.into_iter());
541 dirty
542 .merge_dirty_time_windows(window_size, expire_lower_bound)
543 .unwrap();
544 assert_eq!(expected, dirty.windows);
545 let filter_expr = dirty
546 .gen_filter_exprs(
547 "ts",
548 expire_lower_bound,
549 window_size,
550 DirtyTimeWindows::MAX_FILTER_NUM,
551 0,
552 None,
553 )
554 .unwrap();
555
556 let unparser = datafusion::sql::unparser::Unparser::default();
557 let to_sql = filter_expr
558 .as_ref()
559 .map(|e| unparser.expr_to_sql(e).unwrap().to_string());
560 assert_eq!(expected_filter_expr, to_sql.as_deref());
561 }
562 }
563
564 #[tokio::test]
565 async fn test_align_time_window() {
566 type TimeWindow = (Timestamp, Option<Timestamp>);
567 struct TestCase {
568 sql: String,
569 aligns: Vec<(TimeWindow, TimeWindow)>,
570 }
571 let testcases: Vec<TestCase> = vec![TestCase{
572 sql: "SELECT date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window;".to_string(),
573 aligns: vec![
574 ((Timestamp::new_second(3), None), (Timestamp::new_second(0), None)),
575 ((Timestamp::new_second(8), None), (Timestamp::new_second(5), None)),
576 ((Timestamp::new_second(8), Some(Timestamp::new_second(10))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
577 ((Timestamp::new_second(8), Some(Timestamp::new_second(9))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
578 ],
579 }];
580
581 let query_engine = create_test_query_engine();
582 let ctx = QueryContext::arc();
583 for TestCase { sql, aligns } in testcases {
584 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, true)
585 .await
586 .unwrap();
587
588 let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
589 &plan,
590 query_engine.engine_state().catalog_manager().clone(),
591 ctx.clone(),
592 )
593 .await
594 .unwrap();
595
596 let time_window_expr = time_window_expr
597 .map(|expr| {
598 TimeWindowExpr::from_expr(
599 &expr,
600 &column_name,
601 &df_schema,
602 &query_engine.engine_state().session_state(),
603 )
604 })
605 .transpose()
606 .unwrap()
607 .unwrap();
608
609 let dirty = DirtyTimeWindows::default();
610 for (before_align, expected_after_align) in aligns {
611 let after_align = dirty
612 .align_time_window(before_align.0, before_align.1, &time_window_expr)
613 .unwrap();
614 assert_eq!(expected_after_align, after_align);
615 }
616 }
617 }
618}