1use std::collections::{BTreeMap, BTreeSet, HashMap};
19use std::time::Duration;
20
21use common_telemetry::debug;
22use common_telemetry::tracing::warn;
23use common_time::Timestamp;
24use datatypes::value::Value;
25use session::context::QueryContextRef;
26use snafu::{OptionExt, ResultExt, ensure};
27use tokio::sync::oneshot;
28use tokio::time::Instant;
29
30use crate::batching_mode::task::BatchingTask;
31use crate::batching_mode::time_window::TimeWindowExpr;
32use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
33use crate::metrics::{
34 METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE,
35 METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE,
36};
37use crate::{Error, FlowId};
38
39#[derive(Debug)]
41pub struct TaskState {
42 pub(crate) query_ctx: QueryContextRef,
44 last_update_time: Instant,
46 last_query_duration: Duration,
48 last_exec_time_millis: Option<i64>,
50 pub(crate) dirty_time_windows: DirtyTimeWindows,
53 checkpoint_mode: CheckpointMode,
54 pending_fenced_repair: Option<FencedRepair>,
55 checkpoints: BTreeMap<u64, u64>,
58 incremental_disabled: bool,
62 exec_state: ExecState,
63 pub(crate) shutdown_rx: oneshot::Receiver<()>,
65 pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
67}
68impl TaskState {
69 pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
70 Self::with_dirty_time_windows(query_ctx, shutdown_rx, DirtyTimeWindows::default())
71 }
72
73 pub fn with_dirty_time_windows(
74 query_ctx: QueryContextRef,
75 shutdown_rx: oneshot::Receiver<()>,
76 dirty_time_windows: DirtyTimeWindows,
77 ) -> Self {
78 Self {
79 query_ctx,
80 last_update_time: Instant::now(),
81 last_query_duration: Duration::from_secs(0),
82 last_exec_time_millis: None,
83 dirty_time_windows,
84 checkpoint_mode: CheckpointMode::FullSnapshot,
85 pending_fenced_repair: None,
86 checkpoints: Default::default(),
87 incremental_disabled: false,
88 exec_state: ExecState::Idle,
89 shutdown_rx,
90 task_handle: None,
91 }
92 }
93
94 pub fn after_query_exec(&mut self, elapsed: Duration, is_succ: bool) {
97 self.exec_state = ExecState::Idle;
98 self.last_query_duration = elapsed;
99 self.last_update_time = Instant::now();
100 if is_succ {
101 self.last_exec_time_millis = Some(common_time::util::current_time_millis());
102 }
103 }
104
105 pub fn last_execution_time_millis(&self) -> Option<i64> {
106 self.last_exec_time_millis
107 }
108
109 pub fn checkpoint_mode(&self) -> CheckpointMode {
110 self.checkpoint_mode
111 }
112
113 pub fn checkpoints(&self) -> &BTreeMap<u64, u64> {
114 &self.checkpoints
115 }
116
117 pub fn pending_fenced_repair(&self) -> Option<&FencedRepair> {
120 self.pending_fenced_repair.as_ref()
121 }
122
123 pub fn is_incremental_disabled(&self) -> bool {
124 self.incremental_disabled
125 }
126
127 pub fn disable_incremental(&mut self) {
130 self.incremental_disabled = true;
131 self.mark_full_snapshot();
132 }
133
134 pub fn mark_full_snapshot(&mut self) {
138 self.abandon_fenced_repair();
139 }
140
141 pub fn advance_checkpoints(&mut self, watermark_map: HashMap<u64, u64>) {
144 self.checkpoints = watermark_map.into_iter().collect();
145 self.pending_fenced_repair = None;
146 if !self.incremental_disabled {
147 self.checkpoint_mode = CheckpointMode::Incremental;
148 }
149 }
150
151 pub fn advance_incremental_checkpoints_with_participation(
154 &mut self,
155 participating_regions: &BTreeSet<u64>,
156 watermark_map: HashMap<u64, u64>,
157 ) {
158 for region_id in participating_regions {
159 if let Some(seq) = watermark_map.get(region_id) {
160 self.checkpoints.insert(*region_id, *seq);
161 }
162 }
163 if !self.incremental_disabled {
164 self.checkpoint_mode = CheckpointMode::Incremental;
165 }
166 self.pending_fenced_repair = None;
167 }
168
169 pub fn start_fenced_repair(&mut self, high: BTreeMap<u64, u64>) -> Option<&FencedRepair> {
174 if self.dirty_time_windows.is_empty() {
175 self.pending_fenced_repair = None;
176 return None;
177 }
178
179 let pending_windows = self.dirty_time_windows.clone();
180 self.dirty_time_windows.clean();
181 self.pending_fenced_repair = Some(FencedRepair {
182 high,
183 pending_windows,
184 });
185 self.checkpoint_mode = CheckpointMode::FullSnapshot;
186 self.pending_fenced_repair.as_ref()
187 }
188
189 pub fn finish_fenced_repair(&mut self) -> Option<BTreeMap<u64, u64>> {
192 let repair = self.pending_fenced_repair.take()?;
193 self.checkpoints = repair.high;
194 if !self.incremental_disabled {
195 self.checkpoint_mode = CheckpointMode::Incremental;
196 }
197 Some(self.checkpoints.clone())
198 }
199
200 pub fn abandon_fenced_repair(&mut self) -> bool {
203 self.checkpoint_mode = CheckpointMode::FullSnapshot;
204 let Some(repair) = self.pending_fenced_repair.take() else {
205 return false;
206 };
207
208 self.dirty_time_windows
209 .add_dirty_windows(&repair.pending_windows);
210 true
211 }
212
213 pub fn restore_scoped_windows(&mut self, filter: &FilterExprInfo) {
217 if let Some(repair) = self.pending_fenced_repair.as_mut() {
218 repair
219 .pending_windows
220 .add_windows(filter.time_ranges.clone());
221 return;
222 }
223
224 self.dirty_time_windows
225 .add_windows(filter.time_ranges.clone());
226 }
227
228 pub fn gen_scoped_filter_exprs(
231 &mut self,
232 col_name: &str,
233 expire_lower_bound: Option<Timestamp>,
234 window_size: chrono::Duration,
235 window_cnt: usize,
236 flow_id: FlowId,
237 task_ctx: Option<&BatchingTask>,
238 ) -> Result<Option<FilterExprInfo>, Error> {
239 if let Some(repair) = self.pending_fenced_repair.as_mut() {
240 let expr = repair.pending_windows.gen_filter_exprs(
241 col_name,
242 expire_lower_bound,
243 window_size,
244 window_cnt,
245 flow_id,
246 task_ctx,
247 )?;
248 if expr.is_some() || !repair.pending_windows.is_empty() {
249 return Ok(expr);
250 }
251
252 self.pending_fenced_repair = None;
256 }
257
258 self.dirty_time_windows.gen_filter_exprs(
259 col_name,
260 expire_lower_bound,
261 window_size,
262 window_cnt,
263 flow_id,
264 task_ctx,
265 )
266 }
267
268 pub fn fenced_repair_watermarks_match_high(
271 &self,
272 participating_regions: &BTreeSet<u64>,
273 watermark_map: &HashMap<u64, u64>,
274 ) -> bool {
275 let Some(repair) = self.pending_fenced_repair.as_ref() else {
276 return false;
277 };
278
279 !participating_regions.is_empty()
280 && participating_regions.len() == repair.high.len()
281 && watermark_map.len() == repair.high.len()
282 && participating_regions.iter().all(|region_id| {
283 repair
284 .high
285 .get(region_id)
286 .zip(watermark_map.get(region_id))
287 .is_some_and(|(high, watermark)| high == watermark)
288 })
289 }
290
291 pub fn fenced_repair_pending_is_empty(&self) -> bool {
293 self.pending_fenced_repair
294 .as_ref()
295 .is_some_and(|repair| repair.pending_windows.is_empty())
296 }
297
298 pub fn can_advance_full_snapshot_checkpoints(
301 &self,
302 participating_regions: &BTreeSet<u64>,
303 watermark_map: &HashMap<u64, u64>,
304 ) -> bool {
305 !participating_regions.is_empty()
306 && participating_regions.len() == watermark_map.len()
307 && participating_regions
308 .iter()
309 .all(|region_id| watermark_map.contains_key(region_id))
310 }
311
312 pub fn can_advance_incremental_checkpoints_with_participation(
315 &self,
316 participating_regions: &BTreeSet<u64>,
317 watermark_map: &HashMap<u64, u64>,
318 ) -> bool {
319 !self.incremental_disabled
320 && !self.checkpoints.is_empty()
321 && !participating_regions.is_empty()
322 && participating_regions.len() == watermark_map.len()
323 && participating_regions
324 .iter()
325 .all(|region_id| self.checkpoints.contains_key(region_id))
326 && participating_regions.iter().all(|region_id| {
327 let checkpoint = self.checkpoints.get(region_id);
328 watermark_map
329 .get(region_id)
330 .zip(checkpoint)
331 .is_some_and(|(seq, checkpoint)| seq >= checkpoint)
332 })
333 }
334
335 pub fn get_next_start_query_time(
353 &self,
354 flow_id: FlowId,
355 time_window_size: &Option<Duration>,
356 min_refresh_duration: Duration,
357 max_timeout: Option<Duration>,
358 max_filter_num_per_query: usize,
359 prefer_short_incremental_cadence: bool,
360 ) -> Instant {
361 let lower = time_window_size.unwrap_or(min_refresh_duration);
363 let next_duration = self.last_query_duration.max(lower);
364 let next_duration = if let Some(max_timeout) = max_timeout {
365 next_duration.min(max_timeout)
366 } else {
367 next_duration
368 };
369
370 if self
371 .pending_fenced_repair
372 .as_ref()
373 .is_some_and(|repair| !repair.pending_windows().is_empty())
374 {
375 debug!(
376 "Flow id = {}, active fenced repair still has pending windows, execute immediately",
377 flow_id,
378 );
379 return Instant::now();
380 }
381
382 let cur_dirty_window_size = self.dirty_time_windows.window_size();
383 let max_query_update_range = (*time_window_size)
385 .unwrap_or_default()
386 .mul_f64(max_filter_num_per_query as f64);
387 if cur_dirty_window_size < max_query_update_range {
390 if prefer_short_incremental_cadence {
391 let next_duration = self.last_query_duration.max(min_refresh_duration);
395 let next_duration = if let Some(max_timeout) = max_timeout {
396 next_duration.min(max_timeout)
397 } else {
398 next_duration
399 };
400 self.last_update_time + next_duration
401 } else {
402 self.last_update_time + next_duration
403 }
404 } else {
405 debug!(
408 "Flow id = {}, still have too many {} dirty time window({:?}), execute immediately",
409 flow_id,
410 self.dirty_time_windows.windows.len(),
411 self.dirty_time_windows.windows
412 );
413 Instant::now()
414 }
415 }
416}
417
418#[derive(Debug, Clone)]
421pub struct DirtyTimeWindows {
422 windows: BTreeMap<Timestamp, Option<Timestamp>>,
425 max_filter_num_per_query: usize,
427 time_window_merge_threshold: usize,
430}
431
432impl DirtyTimeWindows {
433 pub fn new(max_filter_num_per_query: usize, time_window_merge_threshold: usize) -> Self {
434 Self {
435 windows: BTreeMap::new(),
436 max_filter_num_per_query,
437 time_window_merge_threshold,
438 }
439 }
440
441 #[cfg(test)]
442 pub(crate) fn max_filter_num_per_query(&self) -> usize {
443 self.max_filter_num_per_query
444 }
445
446 #[cfg(test)]
447 pub(crate) fn time_window_merge_threshold(&self) -> usize {
448 self.time_window_merge_threshold
449 }
450}
451
452impl Default for DirtyTimeWindows {
453 fn default() -> Self {
454 Self {
455 windows: BTreeMap::new(),
456 max_filter_num_per_query: 20,
457 time_window_merge_threshold: 3,
458 }
459 }
460}
461
462impl DirtyTimeWindows {
463 pub const MERGE_DIST: i32 = 3;
467
468 pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
474 for lower_bound in lower_bounds {
475 let entry = self.windows.entry(lower_bound);
476 entry.or_insert(None);
477 }
478 }
479
480 pub fn window_size(&self) -> Duration {
481 let mut ret = Duration::from_secs(0);
482 for (start, end) in &self.windows {
483 if let Some(end) = end
484 && let Some(duration) = end.sub(start)
485 {
486 ret += duration.to_std().unwrap_or_default();
487 }
488 }
489 ret
490 }
491
492 pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
493 self.add_or_merge_window(start, end);
494 }
495
496 pub fn add_windows(&mut self, time_ranges: Vec<(Timestamp, Timestamp)>) {
497 for (start, end) in time_ranges {
498 self.add_or_merge_window(start, Some(end));
499 }
500 }
501
502 pub fn add_dirty_windows(&mut self, dirty_windows: &DirtyTimeWindows) {
504 for (start, end) in &dirty_windows.windows {
505 self.add_or_merge_window(*start, *end);
506 }
507 }
508
509 fn add_or_merge_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
510 self.windows
511 .entry(start)
512 .and_modify(|current_end| {
513 *current_end = Self::union_window_end(*current_end, end);
514 })
515 .or_insert(end);
516 }
517
518 fn union_window_end(
519 current_end: Option<Timestamp>,
520 incoming_end: Option<Timestamp>,
521 ) -> Option<Timestamp> {
522 match (current_end, incoming_end) {
523 (Some(current), Some(incoming)) => Some(current.max(incoming)),
524 (Some(end), None) | (None, Some(end)) => Some(end),
528 (None, None) => None,
529 }
530 }
531
532 pub fn clean(&mut self) {
534 self.windows.clear();
535 }
536
537 pub fn set_dirty(&mut self) {
540 self.add_or_merge_window(Timestamp::new_second(0), None);
541 }
542
543 pub fn len(&self) -> usize {
545 self.windows.len()
546 }
547
548 pub fn is_empty(&self) -> bool {
549 self.windows.is_empty()
550 }
551
552 pub fn effective_count(&self, window_size: &Duration) -> usize {
555 if self.windows.is_empty() {
556 return 0;
557 }
558 let window_size =
559 chrono::Duration::from_std(*window_size).unwrap_or(chrono::Duration::zero());
560 let total_window_time_range =
561 self.windows
562 .iter()
563 .fold(chrono::Duration::zero(), |acc, (start, end)| {
564 if let Some(end) = end {
565 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
566 } else {
567 acc + window_size
568 }
569 });
570
571 if window_size.num_seconds() == 0 {
573 0
574 } else {
575 (total_window_time_range.num_seconds() / window_size.num_seconds()) as usize
576 }
577 }
578
579 pub fn gen_filter_exprs(
585 &mut self,
586 col_name: &str,
587 expire_lower_bound: Option<Timestamp>,
588 window_size: chrono::Duration,
589 window_cnt: usize,
590 flow_id: FlowId,
591 task_ctx: Option<&BatchingTask>,
592 ) -> Result<Option<FilterExprInfo>, Error> {
593 ensure!(
594 window_size.num_seconds() > 0,
595 UnexpectedSnafu {
596 reason: "window_size is zero, can't generate filter exprs",
597 }
598 );
599
600 debug!(
601 "expire_lower_bound: {:?}, window_size: {:?}",
602 expire_lower_bound.map(|t| t.to_iso8601_string()),
603 window_size
604 );
605 self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
606
607 if self.windows.len() > window_cnt {
608 let first_time_window = self.windows.first_key_value();
609 let last_time_window = self.windows.last_key_value();
610
611 if let Some(task_ctx) = task_ctx {
612 warn!(
613 "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
614 task_ctx.config.flow_id,
615 self.windows.len(),
616 window_cnt,
617 task_ctx.config.time_window_expr,
618 task_ctx.config.expire_after,
619 first_time_window,
620 last_time_window,
621 task_ctx.config.query
622 );
623 } else {
624 warn!(
625 "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}",
626 flow_id,
627 self.windows.len(),
628 window_cnt,
629 first_time_window,
630 last_time_window
631 )
632 }
633 }
634
635 let max_time_range = window_size * window_cnt as i32;
637
638 let mut to_be_query = BTreeMap::new();
639 let mut new_windows = self.windows.clone();
640 let mut cur_time_range = chrono::Duration::zero();
641 for (idx, (start, end)) in self.windows.iter().enumerate() {
642 let first_end = start
643 .add_duration(window_size.to_std().unwrap())
644 .context(TimeSnafu)?;
645 let end = end.unwrap_or(first_end);
646
647 if cur_time_range >= max_time_range {
649 break;
650 }
651
652 if idx >= window_cnt {
654 break;
655 }
656
657 let Some(x) = end.sub(start) else {
658 continue;
659 };
660 if cur_time_range + x <= max_time_range {
661 to_be_query.insert(*start, Some(end));
662 new_windows.remove(start);
663 cur_time_range += x;
664 } else {
665 let surplus = max_time_range - cur_time_range;
668 if surplus.num_seconds() <= window_size.num_seconds() {
669 break;
671 }
672 let times = surplus.num_seconds() / window_size.num_seconds();
673
674 let split_offset = window_size * times as i32;
675 let split_at = start
676 .add_duration(split_offset.to_std().unwrap())
677 .context(TimeSnafu)?;
678 to_be_query.insert(*start, Some(split_at));
679
680 new_windows.remove(start);
682 new_windows.insert(split_at, Some(end));
683 cur_time_range += split_offset;
684 break;
685 }
686 }
687
688 self.windows = new_windows;
689
690 METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
691 .with_label_values(&[flow_id.to_string().as_str()])
692 .observe(to_be_query.len() as f64);
693
694 let full_time_range = to_be_query
695 .iter()
696 .fold(chrono::Duration::zero(), |acc, (start, end)| {
697 if let Some(end) = end {
698 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
699 } else {
700 acc + window_size
701 }
702 })
703 .num_seconds() as f64;
704 METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE
705 .with_label_values(&[flow_id.to_string().as_str()])
706 .observe(full_time_range);
707
708 let stalled_time_range =
709 self.windows
710 .iter()
711 .fold(chrono::Duration::zero(), |acc, (start, end)| {
712 if let Some(end) = end {
713 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
714 } else {
715 acc + window_size
716 }
717 });
718
719 METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE
720 .with_label_values(&[flow_id.to_string().as_str()])
721 .observe(stalled_time_range.num_seconds() as f64);
722
723 let std_window_size = window_size.to_std().map_err(|e| {
724 InternalSnafu {
725 reason: e.to_string(),
726 }
727 .build()
728 })?;
729
730 let mut expr_lst = vec![];
731 let mut time_ranges = vec![];
732 for (start, end) in to_be_query.into_iter() {
733 let (start, end) = if let Some(ctx) = task_ctx {
735 let Some(time_window_expr) = &ctx.config.time_window_expr else {
736 UnexpectedSnafu {
737 reason: "time_window_expr is not set",
738 }
739 .fail()?
740 };
741 self.align_time_window(start, end, time_window_expr)?
742 } else {
743 (start, end)
744 };
745 let end = end.unwrap_or(start.add_duration(std_window_size).context(TimeSnafu)?);
746 time_ranges.push((start, end));
747
748 debug!(
749 "Time window start: {:?}, end: {:?}",
750 start.to_iso8601_string(),
751 end.to_iso8601_string()
752 );
753
754 use datafusion_expr::{col, lit};
755 let lower = to_df_literal(start)?;
756 let upper = to_df_literal(end)?;
757 let expr = col(col_name)
758 .gt_eq(lit(lower))
759 .and(col(col_name).lt(lit(upper)));
760 expr_lst.push(expr);
761 }
762 let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
763 let ret = expr.map(|expr| FilterExprInfo {
764 expr,
765 col_name: col_name.to_string(),
766 time_ranges,
767 window_size,
768 });
769 Ok(ret)
770 }
771
772 fn align_time_window(
773 &self,
774 start: Timestamp,
775 end: Option<Timestamp>,
776 time_window_expr: &TimeWindowExpr,
777 ) -> Result<(Timestamp, Option<Timestamp>), Error> {
778 let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
779 reason: format!(
780 "Failed to align start time {:?} with time window expr {:?}",
781 start, time_window_expr
782 ),
783 })?;
784 let align_end = end
785 .and_then(|end| {
786 time_window_expr
787 .eval(end)
788 .map(|r| if r.0 == Some(end) { r.0 } else { r.1 })
790 .transpose()
791 })
792 .transpose()?;
793 Ok((align_start, align_end))
794 }
795
796 pub fn merge_dirty_time_windows(
800 &mut self,
801 window_size: chrono::Duration,
802 expire_lower_bound: Option<Timestamp>,
803 ) -> Result<(), Error> {
804 if self.windows.is_empty() {
805 return Ok(());
806 }
807
808 let mut new_windows = BTreeMap::new();
809
810 let std_window_size = window_size.to_std().map_err(|e| {
811 InternalSnafu {
812 reason: e.to_string(),
813 }
814 .build()
815 })?;
816
817 let mut prev_tw = None;
819 for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
820 if let Some(expire_lower_bound) = expire_lower_bound
822 && lower_bound < expire_lower_bound
823 {
824 continue;
825 }
826
827 let Some(prev_tw) = &mut prev_tw else {
828 prev_tw = Some((lower_bound, upper_bound));
829 continue;
830 };
831
832 let prev_upper = prev_tw
835 .1
836 .unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
837 prev_tw.1 = Some(prev_upper);
838
839 let cur_upper = upper_bound.unwrap_or(
840 lower_bound
841 .add_duration(std_window_size)
842 .context(TimeSnafu)?,
843 );
844
845 if lower_bound
846 .sub(&prev_upper)
847 .map(|dist| dist <= window_size * self.time_window_merge_threshold as i32)
848 .unwrap_or(false)
849 {
850 prev_tw.1 = Some(cur_upper);
851 } else {
852 new_windows.insert(prev_tw.0, prev_tw.1);
853 *prev_tw = (lower_bound, Some(cur_upper));
854 }
855 }
856
857 if let Some(prev_tw) = prev_tw {
858 new_windows.insert(prev_tw.0, prev_tw.1);
859 }
860
861 self.windows = new_windows;
862
863 Ok(())
864 }
865}
866
867pub(crate) fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
868 let value = Value::from(value);
869 let value = value
870 .try_to_scalar_value(&value.data_type())
871 .with_context(|_| DatatypesSnafu {
872 extra: format!("Failed to convert to scalar value: {}", value),
873 })?;
874 Ok(value)
875}
876
877#[derive(Debug, Clone)]
878enum ExecState {
879 Idle,
880 Executing,
881}
882
883#[derive(Debug, Clone, Copy, PartialEq, Eq)]
884pub enum CheckpointMode {
885 FullSnapshot,
886 Incremental,
887}
888
889#[derive(Debug, Clone)]
892pub struct FencedRepair {
893 high: BTreeMap<u64, u64>,
894 pending_windows: DirtyTimeWindows,
895}
896
897impl FencedRepair {
898 pub fn high(&self) -> &BTreeMap<u64, u64> {
900 &self.high
901 }
902
903 pub fn pending_windows(&self) -> &DirtyTimeWindows {
905 &self.pending_windows
906 }
907}
908
909#[derive(Debug, Clone)]
911pub struct FilterExprInfo {
912 pub expr: datafusion_expr::Expr,
913 pub col_name: String,
914 pub time_ranges: Vec<(Timestamp, Timestamp)>,
915 pub window_size: chrono::Duration,
916}
917
918impl FilterExprInfo {
919 pub fn total_window_length(&self) -> chrono::Duration {
920 self.time_ranges
921 .iter()
922 .fold(chrono::Duration::zero(), |acc, (start, end)| {
923 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
924 })
925 }
926
927 pub fn predicate_for_col(
928 &self,
929 col_name: &str,
930 ) -> Result<Option<datafusion_expr::Expr>, Error> {
931 use datafusion_common::Column;
932 use datafusion_expr::{Expr, lit};
933
934 let mut expr_lst = Vec::with_capacity(self.time_ranges.len());
935 for (start, end) in &self.time_ranges {
936 let lower = to_df_literal(*start)?;
937 let upper = to_df_literal(*end)?;
938 let filter_col = || Expr::Column(Column::new_unqualified(col_name));
939 expr_lst.push(
940 filter_col()
941 .gt_eq(lit(lower))
942 .and(filter_col().lt(lit(upper))),
943 );
944 }
945
946 Ok(expr_lst.into_iter().reduce(|a, b| a.or(b)))
947 }
948}
949
950#[cfg(test)]
951mod test {
952 use pretty_assertions::assert_eq;
953 use session::context::QueryContext;
954
955 use super::*;
956 use crate::batching_mode::time_window::find_time_window_expr;
957 use crate::batching_mode::utils::sql_to_df_plan;
958 use crate::test_utils::create_test_query_engine;
959
960 #[test]
961 fn test_task_state_records_last_execution_time() {
962 let query_ctx = QueryContext::arc();
963 let (_tx, rx) = tokio::sync::oneshot::channel();
964 let mut state = TaskState::new(query_ctx, rx);
965
966 assert_eq!(None, state.last_execution_time_millis());
967 state.after_query_exec(std::time::Duration::from_millis(1), false);
968 assert_eq!(None, state.last_execution_time_millis());
969
970 state.after_query_exec(std::time::Duration::from_millis(1), true);
971 assert!(state.last_execution_time_millis().is_some());
972 }
973
974 #[test]
975 fn test_merge_dirty_time_windows() {
976 let merge_dist = DirtyTimeWindows::default().time_window_merge_threshold;
977 let testcases = vec![
978 (
980 vec![
981 Timestamp::new_second(0),
982 Timestamp::new_second((1 + merge_dist as i64) * 5 * 60),
983 ],
984 (chrono::Duration::seconds(5 * 60), None),
985 BTreeMap::from([(
986 Timestamp::new_second(0),
987 Some(Timestamp::new_second((2 + merge_dist as i64) * 5 * 60)),
988 )]),
989 Some(
990 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:25:00' AS TIMESTAMP)))",
991 ),
992 ),
993 (
995 vec![
996 Timestamp::new_second(0),
997 Timestamp::new_second((2 + merge_dist as i64) * 5 * 60),
998 ],
999 (chrono::Duration::seconds(5 * 60), None),
1000 BTreeMap::from([
1001 (
1002 Timestamp::new_second(0),
1003 Some(Timestamp::new_second(5 * 60)),
1004 ),
1005 (
1006 Timestamp::new_second((2 + merge_dist as i64) * 5 * 60),
1007 Some(Timestamp::new_second((3 + merge_dist as i64) * 5 * 60)),
1008 ),
1009 ]),
1010 Some(
1011 "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:05:00' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:25:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:30:00' AS TIMESTAMP))))",
1012 ),
1013 ),
1014 (
1016 vec![
1017 Timestamp::new_second(0),
1018 Timestamp::new_second((merge_dist as i64) * 5 * 60),
1019 ],
1020 (chrono::Duration::seconds(5 * 60), None),
1021 BTreeMap::from([(
1022 Timestamp::new_second(0),
1023 Some(Timestamp::new_second((1 + merge_dist as i64) * 5 * 60)),
1024 )]),
1025 Some(
1026 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:20:00' AS TIMESTAMP)))",
1027 ),
1028 ),
1029 (
1031 vec![
1032 Timestamp::new_second(0),
1033 Timestamp::new_second((merge_dist as i64) * 3),
1034 Timestamp::new_second((merge_dist as i64) * 3 * 2),
1035 ],
1036 (chrono::Duration::seconds(3), None),
1037 BTreeMap::from([(
1038 Timestamp::new_second(0),
1039 Some(Timestamp::new_second((merge_dist as i64) * 7)),
1040 )]),
1041 Some(
1042 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
1043 ),
1044 ),
1045 (
1047 Vec::from_iter((0..20).map(|i| Timestamp::new_second(i * 3)).chain(
1048 std::iter::once(Timestamp::new_second(
1049 60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1),
1050 )),
1051 )),
1052 (chrono::Duration::seconds(3), None),
1053 BTreeMap::from([
1054 (Timestamp::new_second(0), Some(Timestamp::new_second(60))),
1055 (
1056 Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
1057 Some(Timestamp::new_second(
1058 60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3,
1059 )),
1060 ),
1061 ]),
1062 Some(
1063 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
1064 ),
1065 ),
1066 (
1068 Vec::from_iter((0..40).map(|i| Timestamp::new_second(i * 3))),
1069 (chrono::Duration::seconds(3), None),
1070 BTreeMap::from([(
1071 Timestamp::new_second(0),
1072 Some(Timestamp::new_second(40 * 3)),
1073 )]),
1074 Some(
1075 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
1076 ),
1077 ),
1078 (
1080 Vec::from_iter(
1081 std::iter::once(Timestamp::new_second(0))
1082 .chain((0..40).map(|i| Timestamp::new_second(20 + i * 3))),
1083 ),
1084 (chrono::Duration::seconds(3), None),
1085 BTreeMap::from([
1086 (Timestamp::new_second(0), Some(Timestamp::new_second(3))),
1087 (Timestamp::new_second(20), Some(Timestamp::new_second(140))),
1088 ]),
1089 Some(
1090 "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
1091 ),
1092 ),
1093 (
1095 vec![
1096 Timestamp::new_second(0),
1097 Timestamp::new_second((merge_dist as i64) * 5 * 60),
1098 ],
1099 (
1100 chrono::Duration::seconds(5 * 60),
1101 Some(Timestamp::new_second((merge_dist as i64) * 6 * 60)),
1102 ),
1103 BTreeMap::from([]),
1104 None,
1105 ),
1106 ];
1107 for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
1110 testcases
1111 {
1112 let mut dirty = DirtyTimeWindows::default();
1113 dirty.add_lower_bounds(lower_bounds.into_iter());
1114 dirty
1115 .merge_dirty_time_windows(window_size, expire_lower_bound)
1116 .unwrap();
1117 assert_eq!(expected, dirty.windows);
1118 let filter_expr = dirty
1119 .gen_filter_exprs(
1120 "ts",
1121 expire_lower_bound,
1122 window_size,
1123 dirty.max_filter_num_per_query,
1124 0,
1125 None,
1126 )
1127 .unwrap()
1128 .map(|e| e.expr);
1129
1130 let unparser = datafusion::sql::unparser::Unparser::default();
1131 let to_sql = filter_expr
1132 .as_ref()
1133 .map(|e| unparser.expr_to_sql(e).unwrap().to_string());
1134 assert_eq!(expected_filter_expr, to_sql.as_deref());
1135 }
1136 }
1137
1138 #[tokio::test]
1139 async fn test_align_time_window() {
1140 type TimeWindow = (Timestamp, Option<Timestamp>);
1141 struct TestCase {
1142 sql: String,
1143 aligns: Vec<(TimeWindow, TimeWindow)>,
1144 }
1145 let testcases: Vec<TestCase> = vec![TestCase{
1146 sql: "SELECT date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window;".to_string(),
1147 aligns: vec![
1148 ((Timestamp::new_second(3), None), (Timestamp::new_second(0), None)),
1149 ((Timestamp::new_second(8), None), (Timestamp::new_second(5), None)),
1150 ((Timestamp::new_second(8), Some(Timestamp::new_second(10))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
1151 ((Timestamp::new_second(8), Some(Timestamp::new_second(9))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
1152 ],
1153 }];
1154
1155 let query_engine = create_test_query_engine();
1156 let ctx = QueryContext::arc();
1157 for TestCase { sql, aligns } in testcases {
1158 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, true)
1159 .await
1160 .unwrap();
1161
1162 let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
1163 &plan,
1164 query_engine.engine_state().catalog_manager().clone(),
1165 ctx.clone(),
1166 )
1167 .await
1168 .unwrap();
1169
1170 let time_window_expr = time_window_expr
1171 .map(|expr| {
1172 TimeWindowExpr::from_expr(
1173 &expr,
1174 &column_name,
1175 &df_schema,
1176 &query_engine.engine_state().session_state(),
1177 )
1178 })
1179 .transpose()
1180 .unwrap()
1181 .unwrap();
1182
1183 let dirty = DirtyTimeWindows::default();
1184 for (before_align, expected_after_align) in aligns {
1185 let after_align = dirty
1186 .align_time_window(before_align.0, before_align.1, &time_window_expr)
1187 .unwrap();
1188 assert_eq!(expected_after_align, after_align);
1189 }
1190 }
1191 }
1192
1193 #[test]
1194 fn test_task_state_checkpoint_mode_and_advancement() {
1195 let query_ctx = QueryContext::arc();
1196 let (_tx, rx) = tokio::sync::oneshot::channel();
1197 let mut state = TaskState::new(query_ctx, rx);
1198
1199 assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
1200 assert!(state.checkpoints().is_empty());
1201
1202 state.advance_checkpoints(HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]));
1203 assert_eq!(state.checkpoint_mode(), CheckpointMode::Incremental);
1204 assert_eq!(
1205 state.checkpoints(),
1206 &BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])
1207 );
1208
1209 state.mark_full_snapshot();
1210 assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
1211 assert_eq!(
1212 state.checkpoints(),
1213 &BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])
1214 );
1215 }
1216
1217 #[test]
1218 fn test_mark_full_snapshot_restores_pending_fenced_repair_windows() {
1219 let query_ctx = QueryContext::arc();
1220 let (_tx, rx) = tokio::sync::oneshot::channel();
1221 let mut state = TaskState::new(query_ctx, rx);
1222 state
1223 .dirty_time_windows
1224 .add_window(Timestamp::new_second(10), Some(Timestamp::new_second(15)));
1225 state
1226 .dirty_time_windows
1227 .add_window(Timestamp::new_second(100), Some(Timestamp::new_second(105)));
1228
1229 state
1230 .start_fenced_repair(BTreeMap::from([(1_u64, 10_u64)]))
1231 .unwrap();
1232 assert!(state.dirty_time_windows.is_empty());
1233 assert_eq!(
1234 state
1235 .pending_fenced_repair()
1236 .unwrap()
1237 .pending_windows()
1238 .len(),
1239 2
1240 );
1241
1242 state.mark_full_snapshot();
1243
1244 assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
1245 assert!(state.pending_fenced_repair().is_none());
1246 assert_eq!(state.dirty_time_windows.len(), 2);
1247 }
1248
1249 #[test]
1250 fn test_disable_incremental_persists_full_snapshot_mode() {
1251 let query_ctx = QueryContext::arc();
1252 let (_tx, rx) = tokio::sync::oneshot::channel();
1253 let mut state = TaskState::new(query_ctx, rx);
1254
1255 assert!(!state.is_incremental_disabled());
1256
1257 state.disable_incremental();
1259 assert!(state.is_incremental_disabled());
1260 assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
1261
1262 state.advance_checkpoints(HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]));
1264 assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
1265 assert_eq!(
1266 state.checkpoints(),
1267 &BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])
1268 );
1269
1270 state.mark_full_snapshot();
1272 assert!(state.is_incremental_disabled());
1273 assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
1274 }
1275
1276 #[test]
1277 fn test_full_snapshot_checkpoint_advancement_requires_participating_regions() {
1278 let query_ctx = QueryContext::arc();
1279 let (_tx, rx) = tokio::sync::oneshot::channel();
1280 let state = TaskState::new(query_ctx, rx);
1281
1282 assert!(!state.can_advance_full_snapshot_checkpoints(&BTreeSet::new(), &HashMap::new()));
1283 assert!(!state.can_advance_full_snapshot_checkpoints(
1284 &BTreeSet::from([1_u64, 2_u64]),
1285 &HashMap::from([(1_u64, 10_u64)]),
1286 ));
1287 assert!(state.can_advance_full_snapshot_checkpoints(
1288 &BTreeSet::from([1_u64, 2_u64]),
1289 &HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]),
1290 ));
1291 }
1292
1293 #[test]
1294 fn test_incremental_checkpoint_advancement_requires_participation_alignment() {
1295 let query_ctx = QueryContext::arc();
1296 let (_tx, rx) = tokio::sync::oneshot::channel();
1297 let mut state = TaskState::new(query_ctx, rx);
1298 state.advance_checkpoints(HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]));
1299
1300 assert!(
1301 state.can_advance_incremental_checkpoints_with_participation(
1302 &BTreeSet::from([1_u64]),
1303 &HashMap::from([(1_u64, 11_u64)]),
1304 )
1305 );
1306 assert!(
1307 !state.can_advance_incremental_checkpoints_with_participation(
1308 &BTreeSet::from([1_u64, 2_u64]),
1309 &HashMap::from([(1_u64, 11_u64)]),
1310 )
1311 );
1312 assert!(
1313 !state.can_advance_incremental_checkpoints_with_participation(
1314 &BTreeSet::from([3_u64]),
1315 &HashMap::from([(3_u64, 11_u64)]),
1316 )
1317 );
1318 assert!(
1319 !state.can_advance_incremental_checkpoints_with_participation(
1320 &BTreeSet::from([1_u64]),
1321 &HashMap::from([(1_u64, 9_u64)]),
1322 )
1323 );
1324 assert!(
1325 state.can_advance_incremental_checkpoints_with_participation(
1326 &BTreeSet::from([1_u64, 2_u64]),
1327 &HashMap::from([(1_u64, 11_u64), (2_u64, 21_u64)]),
1328 )
1329 );
1330
1331 state.disable_incremental();
1332 assert!(
1333 !state.can_advance_incremental_checkpoints_with_participation(
1334 &BTreeSet::from([1_u64, 2_u64]),
1335 &HashMap::from([(1_u64, 12_u64), (2_u64, 22_u64)]),
1336 )
1337 );
1338 }
1339
1340 #[test]
1341 fn test_incremental_checkpoint_advancement_merges_participating_subset() {
1342 let query_ctx = QueryContext::arc();
1343 let (_tx, rx) = tokio::sync::oneshot::channel();
1344 let mut state = TaskState::new(query_ctx, rx);
1345 state.advance_checkpoints(HashMap::from([
1346 (1_u64, 10_u64),
1347 (2_u64, 20_u64),
1348 (3_u64, 30_u64),
1349 ]));
1350
1351 state.advance_incremental_checkpoints_with_participation(
1352 &BTreeSet::from([1_u64, 3_u64]),
1353 HashMap::from([(1_u64, 12_u64), (3_u64, 35_u64)]),
1354 );
1355
1356 assert_eq!(state.checkpoint_mode(), CheckpointMode::Incremental);
1357 assert_eq!(
1358 state.checkpoints(),
1359 &BTreeMap::from([(1_u64, 12_u64), (2_u64, 20_u64), (3_u64, 35_u64)])
1360 );
1361 }
1362
1363 #[test]
1364 fn test_filter_expr_info_predicate_for_col_empty_ranges() {
1365 let filter = FilterExprInfo {
1366 expr: datafusion_expr::col("ts"),
1367 col_name: "ts".to_string(),
1368 time_ranges: vec![],
1369 window_size: chrono::Duration::seconds(1),
1370 };
1371
1372 assert!(filter.predicate_for_col("time_window").unwrap().is_none());
1373 }
1374
1375 #[test]
1376 fn test_filter_expr_info_predicate_for_col_single_range() {
1377 let filter = FilterExprInfo {
1378 expr: datafusion_expr::col("ts"),
1379 col_name: "ts".to_string(),
1380 time_ranges: vec![(Timestamp::new_second(0), Timestamp::new_second(1))],
1381 window_size: chrono::Duration::seconds(1),
1382 };
1383
1384 let predicate = filter.predicate_for_col("time_window").unwrap().unwrap();
1385 let unparser = datafusion::sql::unparser::Unparser::default();
1386 assert_eq!(
1387 "((time_window >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (time_window < CAST('1970-01-01 00:00:01' AS TIMESTAMP)))",
1388 unparser.expr_to_sql(&predicate).unwrap().to_string()
1389 );
1390 }
1391
1392 #[test]
1393 fn test_filter_expr_info_predicate_for_col_multiple_ranges() {
1394 let filter = FilterExprInfo {
1395 expr: datafusion_expr::col("ts"),
1396 col_name: "ts".to_string(),
1397 time_ranges: vec![
1398 (Timestamp::new_second(0), Timestamp::new_second(1)),
1399 (Timestamp::new_second(10), Timestamp::new_second(11)),
1400 ],
1401 window_size: chrono::Duration::seconds(1),
1402 };
1403
1404 let predicate = filter.predicate_for_col("time_window").unwrap().unwrap();
1405 let unparser = datafusion::sql::unparser::Unparser::default();
1406 assert_eq!(
1407 "(((time_window >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (time_window < CAST('1970-01-01 00:00:01' AS TIMESTAMP))) OR ((time_window >= CAST('1970-01-01 00:00:10' AS TIMESTAMP)) AND (time_window < CAST('1970-01-01 00:00:11' AS TIMESTAMP))))",
1408 unparser.expr_to_sql(&predicate).unwrap().to_string()
1409 );
1410 }
1411
1412 fn state_with_past_update(age: Duration) -> TaskState {
1414 let query_ctx = QueryContext::arc();
1415 let (_tx, rx) = tokio::sync::oneshot::channel();
1416 let mut state = TaskState::new(query_ctx, rx);
1417 state.last_update_time = Instant::now() - age;
1418 state
1419 }
1420
1421 #[test]
1422 fn test_short_incremental_cadence_uses_min_refresh() {
1423 let state = state_with_past_update(Duration::from_secs(10));
1427
1428 let time_window_size = Some(Duration::from_secs(60)); let min_refresh = Duration::from_secs(5);
1430 let flow_id = 1;
1431
1432 let result = state.get_next_start_query_time(
1433 flow_id,
1434 &time_window_size,
1435 min_refresh,
1436 None,
1437 20,
1438 true, );
1440
1441 let expected = state.last_update_time + min_refresh;
1443 assert_eq!(result, expected);
1444 }
1445
1446 #[test]
1447 fn test_short_incremental_cadence_respects_last_query_duration() {
1448 let mut state = state_with_past_update(Duration::from_secs(10));
1449 state.last_query_duration = Duration::from_secs(20);
1450
1451 let time_window_size = Some(Duration::from_secs(60));
1452 let min_refresh = Duration::from_secs(5);
1453 let flow_id = 1;
1454
1455 let result = state.get_next_start_query_time(
1456 flow_id,
1457 &time_window_size,
1458 min_refresh,
1459 None,
1460 20,
1461 true,
1462 );
1463
1464 assert_eq!(result, state.last_update_time + state.last_query_duration);
1465 }
1466
1467 #[test]
1468 fn test_short_incremental_cadence_respects_max_timeout() {
1469 let mut state = state_with_past_update(Duration::from_secs(10));
1470 state.last_query_duration = Duration::from_secs(20);
1471
1472 let time_window_size = Some(Duration::from_secs(60));
1473 let min_refresh = Duration::from_secs(30);
1474 let max_timeout = Duration::from_secs(5);
1475 let flow_id = 1;
1476
1477 let result = state.get_next_start_query_time(
1478 flow_id,
1479 &time_window_size,
1480 min_refresh,
1481 Some(max_timeout),
1482 20,
1483 true,
1484 );
1485
1486 assert_eq!(result, state.last_update_time + max_timeout);
1487 }
1488
1489 #[test]
1490 fn test_full_snapshot_ignores_short_cadence() {
1491 let mut state = state_with_past_update(Duration::from_secs(10));
1494 state.last_query_duration = Duration::from_secs(1);
1496
1497 let time_window_size = Some(Duration::from_secs(60)); let min_refresh = Duration::from_secs(5);
1499 let flow_id = 1;
1500
1501 let result = state.get_next_start_query_time(
1502 flow_id,
1503 &time_window_size,
1504 min_refresh,
1505 None,
1506 20,
1507 false, );
1509
1510 let expected = state.last_update_time + Duration::from_secs(60);
1513 assert_eq!(result, expected);
1514 }
1515
1516 #[test]
1517 fn test_dirty_window_overflow_schedules_immediately_even_with_short_cadence() {
1518 let mut state = state_with_past_update(Duration::from_secs(10));
1521 state
1523 .dirty_time_windows
1524 .add_window(Timestamp::new_second(0), Some(Timestamp::new_second(3600)));
1525
1526 let time_window_size = Some(Duration::from_secs(1)); let min_refresh = Duration::from_secs(5);
1528 let flow_id = 1;
1529
1530 let result = state.get_next_start_query_time(
1532 flow_id,
1533 &time_window_size,
1534 min_refresh,
1535 None,
1536 1, true,
1538 );
1539 assert!(
1540 result <= Instant::now(),
1541 "dirty overflow should schedule immediately"
1542 );
1543
1544 let result2 = state.get_next_start_query_time(
1546 flow_id,
1547 &time_window_size,
1548 min_refresh,
1549 None,
1550 1,
1551 false,
1552 );
1553 assert!(
1554 result2 <= Instant::now(),
1555 "dirty overflow should schedule immediately"
1556 );
1557 }
1558
1559 #[test]
1560 fn test_pending_fenced_repair_schedules_immediately() {
1561 let mut state = state_with_past_update(Duration::from_secs(10));
1562 state
1563 .dirty_time_windows
1564 .add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5)));
1565 state
1566 .start_fenced_repair(BTreeMap::from([(1_u64, 10_u64)]))
1567 .unwrap();
1568 assert!(state.dirty_time_windows.is_empty());
1569 assert!(!state.fenced_repair_pending_is_empty());
1570
1571 let result = state.get_next_start_query_time(
1572 1,
1573 &Some(Duration::from_secs(60)),
1574 Duration::from_secs(5),
1575 None,
1576 20,
1577 false,
1578 );
1579
1580 assert!(
1581 result <= Instant::now(),
1582 "pending fenced repair backlog should schedule immediately"
1583 );
1584 }
1585
1586 #[test]
1587 fn test_incremental_disabled_ignores_short_cadence() {
1588 let mut state = state_with_past_update(Duration::from_secs(10));
1598 state.last_query_duration = Duration::from_secs(1);
1599
1600 let time_window_size = Some(Duration::from_secs(60));
1601 let min_refresh = Duration::from_secs(5);
1602 let flow_id = 1;
1603
1604 let result = state.get_next_start_query_time(
1605 flow_id,
1606 &time_window_size,
1607 min_refresh,
1608 None,
1609 20,
1610 false, );
1612
1613 let expected = state.last_update_time + Duration::from_secs(60);
1615 assert_eq!(result, expected);
1616 }
1617}