1use std::collections::BTreeMap;
18use std::time::Duration;
19
20use common_telemetry::debug;
21use common_telemetry::tracing::warn;
22use common_time::Timestamp;
23use datatypes::value::Value;
24use session::context::QueryContextRef;
25use snafu::{OptionExt, ResultExt};
26use tokio::sync::oneshot;
27use tokio::time::Instant;
28
29use crate::batching_mode::task::BatchingTask;
30use crate::batching_mode::time_window::TimeWindowExpr;
31use crate::batching_mode::MIN_REFRESH_DURATION;
32use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
33use crate::{Error, FlowId};
34
35#[derive(Debug)]
37pub struct TaskState {
38 pub(crate) query_ctx: QueryContextRef,
40 last_update_time: Instant,
42 last_query_duration: Duration,
44 pub(crate) dirty_time_windows: DirtyTimeWindows,
47 exec_state: ExecState,
48 pub(crate) shutdown_rx: oneshot::Receiver<()>,
50 pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
52}
53impl TaskState {
54 pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
55 Self {
56 query_ctx,
57 last_update_time: Instant::now(),
58 last_query_duration: Duration::from_secs(0),
59 dirty_time_windows: Default::default(),
60 exec_state: ExecState::Idle,
61 shutdown_rx,
62 task_handle: None,
63 }
64 }
65
66 pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) {
69 self.exec_state = ExecState::Idle;
70 self.last_query_duration = elapsed;
71 self.last_update_time = Instant::now();
72 }
73
74 pub fn get_next_start_query_time(
78 &self,
79 flow_id: FlowId,
80 max_timeout: Option<Duration>,
81 ) -> Instant {
82 let next_duration = max_timeout
83 .unwrap_or(self.last_query_duration)
84 .min(self.last_query_duration);
85 let next_duration = next_duration.max(MIN_REFRESH_DURATION);
86
87 if self.dirty_time_windows.windows.is_empty() {
89 self.last_update_time + next_duration
90 } else {
91 debug!(
92 "Flow id = {}, still have {} dirty time window({:?}), execute immediately",
93 flow_id,
94 self.dirty_time_windows.windows.len(),
95 self.dirty_time_windows.windows
96 );
97 Instant::now()
98 }
99 }
100}
101
102#[derive(Debug, Clone, Default)]
105pub struct DirtyTimeWindows {
106 windows: BTreeMap<Timestamp, Option<Timestamp>>,
109}
110
111impl DirtyTimeWindows {
112 const MERGE_DIST: i32 = 3;
116
117 const MAX_FILTER_NUM: usize = 20;
119
120 pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
126 for lower_bound in lower_bounds {
127 let entry = self.windows.entry(lower_bound);
128 entry.or_insert(None);
129 }
130 }
131
132 pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
133 self.windows.insert(start, end);
134 }
135
136 pub fn clean(&mut self) {
138 self.windows.clear();
139 }
140
141 pub fn gen_filter_exprs(
143 &mut self,
144 col_name: &str,
145 expire_lower_bound: Option<Timestamp>,
146 window_size: chrono::Duration,
147 flow_id: FlowId,
148 task_ctx: Option<&BatchingTask>,
149 ) -> Result<Option<datafusion_expr::Expr>, Error> {
150 debug!(
151 "expire_lower_bound: {:?}, window_size: {:?}",
152 expire_lower_bound.map(|t| t.to_iso8601_string()),
153 window_size
154 );
155 self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
156
157 if self.windows.len() > Self::MAX_FILTER_NUM {
158 let first_time_window = self.windows.first_key_value();
159 let last_time_window = self.windows.last_key_value();
160
161 if let Some(task_ctx) = task_ctx {
162 warn!(
163 "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
164 task_ctx.config.flow_id,
165 self.windows.len(),
166 Self::MAX_FILTER_NUM,
167 task_ctx.config.time_window_expr,
168 task_ctx.config.expire_after,
169 first_time_window,
170 last_time_window,
171 task_ctx.config.query
172 );
173 } else {
174 warn!("Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}",
175 flow_id,
176 self.windows.len(),
177 Self::MAX_FILTER_NUM,
178 first_time_window,
179 last_time_window
180 )
181 }
182 }
183
184 let nth = self
186 .windows
187 .iter()
188 .nth(Self::MAX_FILTER_NUM)
189 .map(|(key, _)| *key);
190 let first_nth = {
191 if let Some(nth) = nth {
192 let mut after = self.windows.split_off(&nth);
193 std::mem::swap(&mut self.windows, &mut after);
194
195 after
196 } else {
197 std::mem::take(&mut self.windows)
198 }
199 };
200
201 let mut expr_lst = vec![];
202 for (start, end) in first_nth.into_iter() {
203 let (start, end) = if let Some(ctx) = task_ctx {
205 let Some(time_window_expr) = &ctx.config.time_window_expr else {
206 UnexpectedSnafu {
207 reason: "time_window_expr is not set",
208 }
209 .fail()?
210 };
211 self.align_time_window(start, end, time_window_expr)?
212 } else {
213 (start, end)
214 };
215 debug!(
216 "Time window start: {:?}, end: {:?}",
217 start.to_iso8601_string(),
218 end.map(|t| t.to_iso8601_string())
219 );
220
221 use datafusion_expr::{col, lit};
222 let lower = to_df_literal(start)?;
223 let upper = end.map(to_df_literal).transpose()?;
224 let expr = if let Some(upper) = upper {
225 col(col_name)
226 .gt_eq(lit(lower))
227 .and(col(col_name).lt(lit(upper)))
228 } else {
229 col(col_name).gt_eq(lit(lower))
230 };
231 expr_lst.push(expr);
232 }
233 let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
234 Ok(expr)
235 }
236
237 fn align_time_window(
238 &self,
239 start: Timestamp,
240 end: Option<Timestamp>,
241 time_window_expr: &TimeWindowExpr,
242 ) -> Result<(Timestamp, Option<Timestamp>), Error> {
243 let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
244 reason: format!(
245 "Failed to align start time {:?} with time window expr {:?}",
246 start, time_window_expr
247 ),
248 })?;
249 let align_end = end
250 .and_then(|end| {
251 time_window_expr
252 .eval(end)
253 .map(|r| if r.0 == Some(end) { r.0 } else { r.1 })
255 .transpose()
256 })
257 .transpose()?;
258 Ok((align_start, align_end))
259 }
260
261 pub fn merge_dirty_time_windows(
263 &mut self,
264 window_size: chrono::Duration,
265 expire_lower_bound: Option<Timestamp>,
266 ) -> Result<(), Error> {
267 if self.windows.is_empty() {
268 return Ok(());
269 }
270
271 let mut new_windows = BTreeMap::new();
272
273 let std_window_size = window_size.to_std().map_err(|e| {
274 InternalSnafu {
275 reason: e.to_string(),
276 }
277 .build()
278 })?;
279
280 let mut prev_tw = None;
282 for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
283 if let Some(expire_lower_bound) = expire_lower_bound {
285 if lower_bound < expire_lower_bound {
286 continue;
287 }
288 }
289
290 let Some(prev_tw) = &mut prev_tw else {
291 prev_tw = Some((lower_bound, upper_bound));
292 continue;
293 };
294
295 let prev_upper = prev_tw
298 .1
299 .unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
300 prev_tw.1 = Some(prev_upper);
301
302 let cur_upper = upper_bound.unwrap_or(
303 lower_bound
304 .add_duration(std_window_size)
305 .context(TimeSnafu)?,
306 );
307
308 if lower_bound
309 .sub(&prev_upper)
310 .map(|dist| dist <= window_size * Self::MERGE_DIST)
311 .unwrap_or(false)
312 {
313 prev_tw.1 = Some(cur_upper);
314 } else {
315 new_windows.insert(prev_tw.0, prev_tw.1);
316 *prev_tw = (lower_bound, Some(cur_upper));
317 }
318 }
319
320 if let Some(prev_tw) = prev_tw {
321 new_windows.insert(prev_tw.0, prev_tw.1);
322 }
323
324 self.windows = new_windows;
325
326 Ok(())
327 }
328}
329
330fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
331 let value = Value::from(value);
332 let value = value
333 .try_to_scalar_value(&value.data_type())
334 .with_context(|_| DatatypesSnafu {
335 extra: format!("Failed to convert to scalar value: {}", value),
336 })?;
337 Ok(value)
338}
339
340#[derive(Debug, Clone)]
341enum ExecState {
342 Idle,
343 Executing,
344}
345
346#[cfg(test)]
347mod test {
348 use pretty_assertions::assert_eq;
349 use session::context::QueryContext;
350
351 use super::*;
352 use crate::batching_mode::time_window::find_time_window_expr;
353 use crate::batching_mode::utils::sql_to_df_plan;
354 use crate::test_utils::create_test_query_engine;
355
356 #[test]
357 fn test_merge_dirty_time_windows() {
358 let testcases = vec![
359 (
361 vec![
362 Timestamp::new_second(0),
363 Timestamp::new_second((1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
364 ],
365 (chrono::Duration::seconds(5 * 60), None),
366 BTreeMap::from([(
367 Timestamp::new_second(0),
368 Some(Timestamp::new_second(
369 (2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
370 )),
371 )]),
372 Some(
373 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:25:00' AS TIMESTAMP)))",
374 )
375 ),
376 (
378 vec![
379 Timestamp::new_second(0),
380 Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
381 ],
382 (chrono::Duration::seconds(5 * 60), None),
383 BTreeMap::from([
384 (
385 Timestamp::new_second(0),
386 Some(Timestamp::new_second(5 * 60)),
387 ),
388 (
389 Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
390 Some(Timestamp::new_second(
391 (3 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
392 )),
393 ),
394 ]),
395 Some(
396 "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:05:00' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:25:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:30:00' AS TIMESTAMP))))",
397 )
398 ),
399 (
401 vec![
402 Timestamp::new_second(0),
403 Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
404 ],
405 (chrono::Duration::seconds(5 * 60), None),
406 BTreeMap::from([(
407 Timestamp::new_second(0),
408 Some(Timestamp::new_second(
409 (1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
410 )),
411 )]),
412 Some(
413 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:20:00' AS TIMESTAMP)))",
414 )
415 ),
416 (
418 vec![
419 Timestamp::new_second(0),
420 Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3),
421 Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3 * 2),
422 ],
423 (chrono::Duration::seconds(3), None),
424 BTreeMap::from([(
425 Timestamp::new_second(0),
426 Some(Timestamp::new_second(
427 (DirtyTimeWindows::MERGE_DIST as i64) * 7
428 )),
429 )]),
430 Some(
431 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
432 )
433 ),
434 (
436 vec![
437 Timestamp::new_second(0),
438 Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
439 ],
440 (
441 chrono::Duration::seconds(5 * 60),
442 Some(Timestamp::new_second(
443 (DirtyTimeWindows::MERGE_DIST as i64) * 6 * 60,
444 )),
445 ),
446 BTreeMap::from([]),
447 None
448 ),
449 ];
450 for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
451 testcases
452 {
453 let mut dirty = DirtyTimeWindows::default();
454 dirty.add_lower_bounds(lower_bounds.into_iter());
455 dirty
456 .merge_dirty_time_windows(window_size, expire_lower_bound)
457 .unwrap();
458 assert_eq!(expected, dirty.windows);
459 let filter_expr = dirty
460 .gen_filter_exprs("ts", expire_lower_bound, window_size, 0, None)
461 .unwrap();
462
463 let unparser = datafusion::sql::unparser::Unparser::default();
464 let to_sql = filter_expr
465 .as_ref()
466 .map(|e| unparser.expr_to_sql(e).unwrap().to_string());
467 assert_eq!(expected_filter_expr, to_sql.as_deref());
468 }
469 }
470
471 #[tokio::test]
472 async fn test_align_time_window() {
473 type TimeWindow = (Timestamp, Option<Timestamp>);
474 struct TestCase {
475 sql: String,
476 aligns: Vec<(TimeWindow, TimeWindow)>,
477 }
478 let testcases: Vec<TestCase> = vec![TestCase{
479 sql: "SELECT date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window;".to_string(),
480 aligns: vec![
481 ((Timestamp::new_second(3), None), (Timestamp::new_second(0), None)),
482 ((Timestamp::new_second(8), None), (Timestamp::new_second(5), None)),
483 ((Timestamp::new_second(8), Some(Timestamp::new_second(10))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
484 ((Timestamp::new_second(8), Some(Timestamp::new_second(9))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
485 ],
486 }];
487
488 let query_engine = create_test_query_engine();
489 let ctx = QueryContext::arc();
490 for TestCase { sql, aligns } in testcases {
491 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, true)
492 .await
493 .unwrap();
494
495 let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
496 &plan,
497 query_engine.engine_state().catalog_manager().clone(),
498 ctx.clone(),
499 )
500 .await
501 .unwrap();
502
503 let time_window_expr = time_window_expr
504 .map(|expr| {
505 TimeWindowExpr::from_expr(
506 &expr,
507 &column_name,
508 &df_schema,
509 &query_engine.engine_state().session_state(),
510 )
511 })
512 .transpose()
513 .unwrap()
514 .unwrap();
515
516 let dirty = DirtyTimeWindows::default();
517 for (before_align, expected_after_align) in aligns {
518 let after_align = dirty
519 .align_time_window(before_align.0, before_align.1, &time_window_expr)
520 .unwrap();
521 assert_eq!(expected_after_align, after_align);
522 }
523 }
524 }
525}