1use std::collections::BTreeMap;
18use std::time::Duration;
19
20use common_telemetry::debug;
21use common_telemetry::tracing::warn;
22use common_time::Timestamp;
23use datatypes::value::Value;
24use session::context::QueryContextRef;
25use snafu::{OptionExt, ResultExt};
26use tokio::sync::oneshot;
27use tokio::time::Instant;
28
29use crate::batching_mode::task::BatchingTask;
30use crate::batching_mode::time_window::TimeWindowExpr;
31use crate::batching_mode::MIN_REFRESH_DURATION;
32use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
33use crate::metrics::{
34 METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
35};
36use crate::{Error, FlowId};
37
38#[derive(Debug)]
40pub struct TaskState {
41 pub(crate) query_ctx: QueryContextRef,
43 last_update_time: Instant,
45 last_query_duration: Duration,
47 pub(crate) dirty_time_windows: DirtyTimeWindows,
50 exec_state: ExecState,
51 pub(crate) shutdown_rx: oneshot::Receiver<()>,
53 pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
55}
56impl TaskState {
57 pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
58 Self {
59 query_ctx,
60 last_update_time: Instant::now(),
61 last_query_duration: Duration::from_secs(0),
62 dirty_time_windows: Default::default(),
63 exec_state: ExecState::Idle,
64 shutdown_rx,
65 task_handle: None,
66 }
67 }
68
69 pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) {
72 self.exec_state = ExecState::Idle;
73 self.last_query_duration = elapsed;
74 self.last_update_time = Instant::now();
75 }
76
77 pub fn get_next_start_query_time(
88 &self,
89 flow_id: FlowId,
90 time_window_size: &Option<Duration>,
91 max_timeout: Option<Duration>,
92 ) -> Instant {
93 let last_duration = max_timeout
94 .unwrap_or(self.last_query_duration)
95 .min(self.last_query_duration)
96 .max(MIN_REFRESH_DURATION);
97
98 let next_duration = time_window_size
99 .map(|t| {
100 let half = t / 2;
101 half.max(last_duration)
102 })
103 .unwrap_or(last_duration);
104
105 if self.dirty_time_windows.windows.is_empty() {
107 self.last_update_time + next_duration
108 } else {
109 debug!(
110 "Flow id = {}, still have {} dirty time window({:?}), execute immediately",
111 flow_id,
112 self.dirty_time_windows.windows.len(),
113 self.dirty_time_windows.windows
114 );
115 Instant::now()
116 }
117 }
118}
119
120#[derive(Debug, Clone, Default)]
123pub struct DirtyTimeWindows {
124 windows: BTreeMap<Timestamp, Option<Timestamp>>,
127}
128
129impl DirtyTimeWindows {
130 pub const MERGE_DIST: i32 = 3;
134
135 pub const MAX_FILTER_NUM: usize = 20;
137
138 pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
144 for lower_bound in lower_bounds {
145 let entry = self.windows.entry(lower_bound);
146 entry.or_insert(None);
147 }
148 }
149
150 pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
151 self.windows.insert(start, end);
152 }
153
154 pub fn clean(&mut self) {
156 self.windows.clear();
157 }
158
159 pub fn gen_filter_exprs(
165 &mut self,
166 col_name: &str,
167 expire_lower_bound: Option<Timestamp>,
168 window_size: chrono::Duration,
169 window_cnt: usize,
170 flow_id: FlowId,
171 task_ctx: Option<&BatchingTask>,
172 ) -> Result<Option<datafusion_expr::Expr>, Error> {
173 debug!(
174 "expire_lower_bound: {:?}, window_size: {:?}",
175 expire_lower_bound.map(|t| t.to_iso8601_string()),
176 window_size
177 );
178 self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
179
180 if self.windows.len() > Self::MAX_FILTER_NUM {
181 let first_time_window = self.windows.first_key_value();
182 let last_time_window = self.windows.last_key_value();
183
184 if let Some(task_ctx) = task_ctx {
185 warn!(
186 "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
187 task_ctx.config.flow_id,
188 self.windows.len(),
189 Self::MAX_FILTER_NUM,
190 task_ctx.config.time_window_expr,
191 task_ctx.config.expire_after,
192 first_time_window,
193 last_time_window,
194 task_ctx.config.query
195 );
196 } else {
197 warn!("Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}",
198 flow_id,
199 self.windows.len(),
200 Self::MAX_FILTER_NUM,
201 first_time_window,
202 last_time_window
203 )
204 }
205 }
206
207 let max_time_range = window_size * window_cnt as i32;
209 let nth = {
210 let mut cur_time_range = chrono::Duration::zero();
211 let mut nth_key = None;
212 for (idx, (start, end)) in self.windows.iter().enumerate() {
213 if cur_time_range > max_time_range {
215 nth_key = Some(*start);
216 break;
217 }
218
219 if idx >= window_cnt {
221 nth_key = Some(*start);
222 break;
223 }
224
225 if let Some(end) = end {
226 if let Some(x) = end.sub(start) {
227 cur_time_range += x;
228 }
229 }
230 }
231
232 nth_key
233 };
234 let first_nth = {
235 if let Some(nth) = nth {
236 let mut after = self.windows.split_off(&nth);
237 std::mem::swap(&mut self.windows, &mut after);
238
239 after
240 } else {
241 std::mem::take(&mut self.windows)
242 }
243 };
244
245 METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
246 .with_label_values(&[flow_id.to_string().as_str()])
247 .observe(first_nth.len() as f64);
248
249 let full_time_range = first_nth
250 .iter()
251 .fold(chrono::Duration::zero(), |acc, (start, end)| {
252 if let Some(end) = end {
253 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
254 } else {
255 acc
256 }
257 })
258 .num_seconds() as f64;
259 METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
260 .with_label_values(&[flow_id.to_string().as_str()])
261 .observe(full_time_range);
262
263 let mut expr_lst = vec![];
264 for (start, end) in first_nth.into_iter() {
265 let (start, end) = if let Some(ctx) = task_ctx {
267 let Some(time_window_expr) = &ctx.config.time_window_expr else {
268 UnexpectedSnafu {
269 reason: "time_window_expr is not set",
270 }
271 .fail()?
272 };
273 self.align_time_window(start, end, time_window_expr)?
274 } else {
275 (start, end)
276 };
277 debug!(
278 "Time window start: {:?}, end: {:?}",
279 start.to_iso8601_string(),
280 end.map(|t| t.to_iso8601_string())
281 );
282
283 use datafusion_expr::{col, lit};
284 let lower = to_df_literal(start)?;
285 let upper = end.map(to_df_literal).transpose()?;
286 let expr = if let Some(upper) = upper {
287 col(col_name)
288 .gt_eq(lit(lower))
289 .and(col(col_name).lt(lit(upper)))
290 } else {
291 col(col_name).gt_eq(lit(lower))
292 };
293 expr_lst.push(expr);
294 }
295 let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
296 Ok(expr)
297 }
298
299 fn align_time_window(
300 &self,
301 start: Timestamp,
302 end: Option<Timestamp>,
303 time_window_expr: &TimeWindowExpr,
304 ) -> Result<(Timestamp, Option<Timestamp>), Error> {
305 let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
306 reason: format!(
307 "Failed to align start time {:?} with time window expr {:?}",
308 start, time_window_expr
309 ),
310 })?;
311 let align_end = end
312 .and_then(|end| {
313 time_window_expr
314 .eval(end)
315 .map(|r| if r.0 == Some(end) { r.0 } else { r.1 })
317 .transpose()
318 })
319 .transpose()?;
320 Ok((align_start, align_end))
321 }
322
323 pub fn merge_dirty_time_windows(
327 &mut self,
328 window_size: chrono::Duration,
329 expire_lower_bound: Option<Timestamp>,
330 ) -> Result<(), Error> {
331 if self.windows.is_empty() {
332 return Ok(());
333 }
334
335 let mut new_windows = BTreeMap::new();
336
337 let std_window_size = window_size.to_std().map_err(|e| {
338 InternalSnafu {
339 reason: e.to_string(),
340 }
341 .build()
342 })?;
343
344 let mut prev_tw = None;
346 for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
347 if let Some(expire_lower_bound) = expire_lower_bound {
349 if lower_bound < expire_lower_bound {
350 continue;
351 }
352 }
353
354 let Some(prev_tw) = &mut prev_tw else {
355 prev_tw = Some((lower_bound, upper_bound));
356 continue;
357 };
358
359 let prev_upper = prev_tw
362 .1
363 .unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
364 prev_tw.1 = Some(prev_upper);
365
366 let cur_upper = upper_bound.unwrap_or(
367 lower_bound
368 .add_duration(std_window_size)
369 .context(TimeSnafu)?,
370 );
371
372 if lower_bound
373 .sub(&prev_upper)
374 .map(|dist| dist <= window_size * Self::MERGE_DIST)
375 .unwrap_or(false)
376 {
377 prev_tw.1 = Some(cur_upper);
378 } else {
379 new_windows.insert(prev_tw.0, prev_tw.1);
380 *prev_tw = (lower_bound, Some(cur_upper));
381 }
382 }
383
384 if let Some(prev_tw) = prev_tw {
385 new_windows.insert(prev_tw.0, prev_tw.1);
386 }
387
388 self.windows = new_windows;
389
390 Ok(())
391 }
392}
393
394fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
395 let value = Value::from(value);
396 let value = value
397 .try_to_scalar_value(&value.data_type())
398 .with_context(|_| DatatypesSnafu {
399 extra: format!("Failed to convert to scalar value: {}", value),
400 })?;
401 Ok(value)
402}
403
404#[derive(Debug, Clone)]
405enum ExecState {
406 Idle,
407 Executing,
408}
409
410#[cfg(test)]
411mod test {
412 use pretty_assertions::assert_eq;
413 use session::context::QueryContext;
414
415 use super::*;
416 use crate::batching_mode::time_window::find_time_window_expr;
417 use crate::batching_mode::utils::sql_to_df_plan;
418 use crate::test_utils::create_test_query_engine;
419
420 #[test]
421 fn test_merge_dirty_time_windows() {
422 let testcases = vec![
423 (
425 vec![
426 Timestamp::new_second(0),
427 Timestamp::new_second((1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
428 ],
429 (chrono::Duration::seconds(5 * 60), None),
430 BTreeMap::from([(
431 Timestamp::new_second(0),
432 Some(Timestamp::new_second(
433 (2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
434 )),
435 )]),
436 Some(
437 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:25:00' AS TIMESTAMP)))",
438 )
439 ),
440 (
442 vec![
443 Timestamp::new_second(0),
444 Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
445 ],
446 (chrono::Duration::seconds(5 * 60), None),
447 BTreeMap::from([
448 (
449 Timestamp::new_second(0),
450 Some(Timestamp::new_second(5 * 60)),
451 ),
452 (
453 Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
454 Some(Timestamp::new_second(
455 (3 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
456 )),
457 ),
458 ]),
459 Some(
460 "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:05:00' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:25:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:30:00' AS TIMESTAMP))))",
461 )
462 ),
463 (
465 vec![
466 Timestamp::new_second(0),
467 Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
468 ],
469 (chrono::Duration::seconds(5 * 60), None),
470 BTreeMap::from([(
471 Timestamp::new_second(0),
472 Some(Timestamp::new_second(
473 (1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
474 )),
475 )]),
476 Some(
477 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:20:00' AS TIMESTAMP)))",
478 )
479 ),
480 (
482 vec![
483 Timestamp::new_second(0),
484 Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3),
485 Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3 * 2),
486 ],
487 (chrono::Duration::seconds(3), None),
488 BTreeMap::from([(
489 Timestamp::new_second(0),
490 Some(Timestamp::new_second(
491 (DirtyTimeWindows::MERGE_DIST as i64) * 7
492 )),
493 )]),
494 Some(
495 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
496 )
497 ),
498 (
500 vec![
501 Timestamp::new_second(0),
502 Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
503 ],
504 (
505 chrono::Duration::seconds(5 * 60),
506 Some(Timestamp::new_second(
507 (DirtyTimeWindows::MERGE_DIST as i64) * 6 * 60,
508 )),
509 ),
510 BTreeMap::from([]),
511 None
512 ),
513 ];
514 for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
515 testcases
516 {
517 let mut dirty = DirtyTimeWindows::default();
518 dirty.add_lower_bounds(lower_bounds.into_iter());
519 dirty
520 .merge_dirty_time_windows(window_size, expire_lower_bound)
521 .unwrap();
522 assert_eq!(expected, dirty.windows);
523 let filter_expr = dirty
524 .gen_filter_exprs(
525 "ts",
526 expire_lower_bound,
527 window_size,
528 DirtyTimeWindows::MAX_FILTER_NUM,
529 0,
530 None,
531 )
532 .unwrap();
533
534 let unparser = datafusion::sql::unparser::Unparser::default();
535 let to_sql = filter_expr
536 .as_ref()
537 .map(|e| unparser.expr_to_sql(e).unwrap().to_string());
538 assert_eq!(expected_filter_expr, to_sql.as_deref());
539 }
540 }
541
542 #[tokio::test]
543 async fn test_align_time_window() {
544 type TimeWindow = (Timestamp, Option<Timestamp>);
545 struct TestCase {
546 sql: String,
547 aligns: Vec<(TimeWindow, TimeWindow)>,
548 }
549 let testcases: Vec<TestCase> = vec![TestCase{
550 sql: "SELECT date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window;".to_string(),
551 aligns: vec![
552 ((Timestamp::new_second(3), None), (Timestamp::new_second(0), None)),
553 ((Timestamp::new_second(8), None), (Timestamp::new_second(5), None)),
554 ((Timestamp::new_second(8), Some(Timestamp::new_second(10))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
555 ((Timestamp::new_second(8), Some(Timestamp::new_second(9))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
556 ],
557 }];
558
559 let query_engine = create_test_query_engine();
560 let ctx = QueryContext::arc();
561 for TestCase { sql, aligns } in testcases {
562 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, true)
563 .await
564 .unwrap();
565
566 let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
567 &plan,
568 query_engine.engine_state().catalog_manager().clone(),
569 ctx.clone(),
570 )
571 .await
572 .unwrap();
573
574 let time_window_expr = time_window_expr
575 .map(|expr| {
576 TimeWindowExpr::from_expr(
577 &expr,
578 &column_name,
579 &df_schema,
580 &query_engine.engine_state().session_state(),
581 )
582 })
583 .transpose()
584 .unwrap()
585 .unwrap();
586
587 let dirty = DirtyTimeWindows::default();
588 for (before_align, expected_after_align) in aligns {
589 let after_align = dirty
590 .align_time_window(before_align.0, before_align.1, &time_window_expr)
591 .unwrap();
592 assert_eq!(expected_after_align, after_align);
593 }
594 }
595 }
596}