Skip to main content

flow/batching_mode/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Batching mode task state, which changes frequently
16//!
17
18use std::collections::{BTreeMap, BTreeSet, HashMap};
19use std::time::Duration;
20
21use common_telemetry::debug;
22use common_telemetry::tracing::warn;
23use common_time::Timestamp;
24use datatypes::value::Value;
25use session::context::QueryContextRef;
26use snafu::{OptionExt, ResultExt, ensure};
27use tokio::sync::oneshot;
28use tokio::time::Instant;
29
30use crate::batching_mode::task::BatchingTask;
31use crate::batching_mode::time_window::TimeWindowExpr;
32use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
33use crate::metrics::{
34    METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE,
35    METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE,
36};
37use crate::{Error, FlowId};
38
39/// The state of the [`BatchingTask`].
40#[derive(Debug)]
41pub struct TaskState {
42    /// Query context
43    pub(crate) query_ctx: QueryContextRef,
44    /// last query complete time
45    last_update_time: Instant,
46    /// last time query duration
47    last_query_duration: Duration,
48    /// Last successful execution time in unix timestamp milliseconds.
49    last_exec_time_millis: Option<i64>,
50    /// Dirty Time windows need to be updated
51    /// mapping of `start -> end` and non-overlapping
52    pub(crate) dirty_time_windows: DirtyTimeWindows,
53    checkpoint_mode: CheckpointMode,
54    pending_fenced_repair: Option<FencedRepair>,
55    /// Region id -> last consumed watermark sequence. Incremental scans use
56    /// this as the next lower sequence bound for each source region.
57    checkpoints: BTreeMap<u64, u64>,
58    /// Once set, the task will never attempt incremental mode again.
59    /// Set when the flow's query shape is deterministically incompatible
60    /// with incremental execution (e.g. unsupported aggregate expressions).
61    incremental_disabled: bool,
62    exec_state: ExecState,
63    /// Shutdown receiver
64    pub(crate) shutdown_rx: oneshot::Receiver<()>,
65    /// Task handle
66    pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
67}
68impl TaskState {
69    pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
70        Self::with_dirty_time_windows(query_ctx, shutdown_rx, DirtyTimeWindows::default())
71    }
72
73    pub fn with_dirty_time_windows(
74        query_ctx: QueryContextRef,
75        shutdown_rx: oneshot::Receiver<()>,
76        dirty_time_windows: DirtyTimeWindows,
77    ) -> Self {
78        Self {
79            query_ctx,
80            last_update_time: Instant::now(),
81            last_query_duration: Duration::from_secs(0),
82            last_exec_time_millis: None,
83            dirty_time_windows,
84            checkpoint_mode: CheckpointMode::FullSnapshot,
85            pending_fenced_repair: None,
86            checkpoints: Default::default(),
87            incremental_disabled: false,
88            exec_state: ExecState::Idle,
89            shutdown_rx,
90            task_handle: None,
91        }
92    }
93
94    /// called after last query is done
95    /// `is_succ` indicate whether the last query is successful
96    pub fn after_query_exec(&mut self, elapsed: Duration, is_succ: bool) {
97        self.exec_state = ExecState::Idle;
98        self.last_query_duration = elapsed;
99        self.last_update_time = Instant::now();
100        if is_succ {
101            self.last_exec_time_millis = Some(common_time::util::current_time_millis());
102        }
103    }
104
105    pub fn last_execution_time_millis(&self) -> Option<i64> {
106        self.last_exec_time_millis
107    }
108
109    pub fn checkpoint_mode(&self) -> CheckpointMode {
110        self.checkpoint_mode
111    }
112
113    pub fn checkpoints(&self) -> &BTreeMap<u64, u64> {
114        &self.checkpoints
115    }
116
117    /// Returns the in-progress fenced repair, if the task is repairing dirty
118    /// windows under a frozen full-snapshot high watermark.
119    pub fn pending_fenced_repair(&self) -> Option<&FencedRepair> {
120        self.pending_fenced_repair.as_ref()
121    }
122
123    pub fn is_incremental_disabled(&self) -> bool {
124        self.incremental_disabled
125    }
126
127    /// Permanently disable incremental mode for this task and
128    /// immediately fall back to full snapshot for the current cycle.
129    pub fn disable_incremental(&mut self) {
130        self.incremental_disabled = true;
131        self.mark_full_snapshot();
132    }
133
134    /// Move back to top-level FullSnapshot mode. If a fenced repair is active,
135    /// restore its not-yet-in-flight pending windows to the live dirty queue so
136    /// the moved backlog is not lost.
137    pub fn mark_full_snapshot(&mut self) {
138        self.abandon_fenced_repair();
139    }
140
141    /// Replace full-snapshot checkpoints with a complete watermark proof.
142    /// Clears fenced repair state and enters Incremental unless disabled.
143    pub fn advance_checkpoints(&mut self, watermark_map: HashMap<u64, u64>) {
144        self.checkpoints = watermark_map.into_iter().collect();
145        self.pending_fenced_repair = None;
146        if !self.incremental_disabled {
147            self.checkpoint_mode = CheckpointMode::Incremental;
148        }
149    }
150
151    /// Advance only the participating regions for an incremental delta query.
152    /// This also clears any stale fenced repair sub-state.
153    pub fn advance_incremental_checkpoints_with_participation(
154        &mut self,
155        participating_regions: &BTreeSet<u64>,
156        watermark_map: HashMap<u64, u64>,
157    ) {
158        for region_id in participating_regions {
159            if let Some(seq) = watermark_map.get(region_id) {
160                self.checkpoints.insert(*region_id, *seq);
161            }
162        }
163        if !self.incremental_disabled {
164            self.checkpoint_mode = CheckpointMode::Incremental;
165        }
166        self.pending_fenced_repair = None;
167    }
168
169    /// Start repairing the current live dirty windows under a frozen high `H`.
170    /// The current live backlog is moved into the fenced repair so successful
171    /// chunks are consumed from that backlog. New post-`H` dirty signals can
172    /// still arrive in the live queue while the fenced repair is active.
173    pub fn start_fenced_repair(&mut self, high: BTreeMap<u64, u64>) -> Option<&FencedRepair> {
174        if self.dirty_time_windows.is_empty() {
175            self.pending_fenced_repair = None;
176            return None;
177        }
178
179        let pending_windows = self.dirty_time_windows.clone();
180        self.dirty_time_windows.clean();
181        self.pending_fenced_repair = Some(FencedRepair {
182            high,
183            pending_windows,
184        });
185        self.checkpoint_mode = CheckpointMode::FullSnapshot;
186        self.pending_fenced_repair.as_ref()
187    }
188
189    /// Finish the fenced repair and promote the frozen high watermark to the
190    /// checkpoint map. Incremental-disabled flows stay in FullSnapshot mode.
191    pub fn finish_fenced_repair(&mut self) -> Option<BTreeMap<u64, u64>> {
192        let repair = self.pending_fenced_repair.take()?;
193        self.checkpoints = repair.high;
194        if !self.incremental_disabled {
195            self.checkpoint_mode = CheckpointMode::Incremental;
196        }
197        Some(self.checkpoints.clone())
198    }
199
200    /// Abandon the current fenced repair and restore all not-yet-in-flight
201    /// pending windows to the live dirty queue for a fresh scoped repair.
202    pub fn abandon_fenced_repair(&mut self) -> bool {
203        self.checkpoint_mode = CheckpointMode::FullSnapshot;
204        let Some(repair) = self.pending_fenced_repair.take() else {
205            return false;
206        };
207
208        self.dirty_time_windows
209            .add_dirty_windows(&repair.pending_windows);
210        true
211    }
212
213    /// Restore a scoped query's windows after a failed or unproven run. During
214    /// an active fenced repair this requeues into `pending_windows`; otherwise
215    /// it restores to the live dirty queue.
216    pub fn restore_scoped_windows(&mut self, filter: &FilterExprInfo) {
217        if let Some(repair) = self.pending_fenced_repair.as_mut() {
218            repair
219                .pending_windows
220                .add_windows(filter.time_ranges.clone());
221            return;
222        }
223
224        self.dirty_time_windows
225            .add_windows(filter.time_ranges.clone());
226    }
227
228    /// Generate the next scoped filter from the fenced-repair queue when active;
229    /// otherwise consume windows from the live dirty queue.
230    pub fn gen_scoped_filter_exprs(
231        &mut self,
232        col_name: &str,
233        expire_lower_bound: Option<Timestamp>,
234        window_size: chrono::Duration,
235        window_cnt: usize,
236        flow_id: FlowId,
237        task_ctx: Option<&BatchingTask>,
238    ) -> Result<Option<FilterExprInfo>, Error> {
239        if let Some(repair) = self.pending_fenced_repair.as_mut() {
240            let expr = repair.pending_windows.gen_filter_exprs(
241                col_name,
242                expire_lower_bound,
243                window_size,
244                window_cnt,
245                flow_id,
246                task_ctx,
247            )?;
248            if expr.is_some() || !repair.pending_windows.is_empty() {
249                return Ok(expr);
250            }
251
252            // All pending repair windows may have expired during merge. Clear
253            // the empty repair so this call can fall back to live dirty windows
254            // instead of routing future executions to an empty queue forever.
255            self.pending_fenced_repair = None;
256        }
257
258        self.dirty_time_windows.gen_filter_exprs(
259            col_name,
260            expire_lower_bound,
261            window_size,
262            window_cnt,
263            flow_id,
264            task_ctx,
265        )
266    }
267
268    /// Returns true only when the query result's participating regions and
269    /// terminal watermarks exactly match the fenced repair's frozen high `H`.
270    pub fn fenced_repair_watermarks_match_high(
271        &self,
272        participating_regions: &BTreeSet<u64>,
273        watermark_map: &HashMap<u64, u64>,
274    ) -> bool {
275        let Some(repair) = self.pending_fenced_repair.as_ref() else {
276            return false;
277        };
278
279        !participating_regions.is_empty()
280            && participating_regions.len() == repair.high.len()
281            && watermark_map.len() == repair.high.len()
282            && participating_regions.iter().all(|region_id| {
283                repair
284                    .high
285                    .get(region_id)
286                    .zip(watermark_map.get(region_id))
287                    .is_some_and(|(high, watermark)| high == watermark)
288            })
289    }
290
291    /// Whether the active fenced repair has drained all pending windows.
292    pub fn fenced_repair_pending_is_empty(&self) -> bool {
293        self.pending_fenced_repair
294            .as_ref()
295            .is_some_and(|repair| repair.pending_windows.is_empty())
296    }
297
298    /// Full-snapshot checkpoint advances require a watermark for every region
299    /// that participated in the query.
300    pub fn can_advance_full_snapshot_checkpoints(
301        &self,
302        participating_regions: &BTreeSet<u64>,
303        watermark_map: &HashMap<u64, u64>,
304    ) -> bool {
305        !participating_regions.is_empty()
306            && participating_regions.len() == watermark_map.len()
307            && participating_regions
308                .iter()
309                .all(|region_id| watermark_map.contains_key(region_id))
310    }
311
312    /// Incremental advances are limited to participating regions whose returned
313    /// watermark is not older than the stored checkpoint.
314    pub fn can_advance_incremental_checkpoints_with_participation(
315        &self,
316        participating_regions: &BTreeSet<u64>,
317        watermark_map: &HashMap<u64, u64>,
318    ) -> bool {
319        !self.incremental_disabled
320            && !self.checkpoints.is_empty()
321            && !participating_regions.is_empty()
322            && participating_regions.len() == watermark_map.len()
323            && participating_regions
324                .iter()
325                .all(|region_id| self.checkpoints.contains_key(region_id))
326            && participating_regions.iter().all(|region_id| {
327                let checkpoint = self.checkpoints.get(region_id);
328                watermark_map
329                    .get(region_id)
330                    .zip(checkpoint)
331                    .is_some_and(|(seq, checkpoint)| seq >= checkpoint)
332            })
333    }
334
335    /// Compute the next query delay based on the time window size or the last query duration.
336    /// Aiming to avoid too frequent queries. But also not too long delay.
337    ///
338    /// next wait time is calculated as:
339    /// last query duration, capped by [max(min_run_interval, time_window_size), max_timeout],
340    /// note at most wait for `max_timeout`.
341    ///
342    /// if current the dirty time range is longer than one query can handle,
343    /// execute immediately to faster clean up dirty time windows.
344    /// Active fenced repairs also execute immediately while pending windows
345    /// remain: the current backlog has moved out of live dirty windows and into
346    /// `pending_fenced_repair.pending_windows`.
347    ///
348    /// If `prefer_short_incremental_cadence` is true, run incremental queries
349    /// more often when there is no large dirty backlog. This only reduces the
350    /// chance of hitting a stale cursor after flush; it is not required for
351    /// correctness.
352    pub fn get_next_start_query_time(
353        &self,
354        flow_id: FlowId,
355        time_window_size: &Option<Duration>,
356        min_refresh_duration: Duration,
357        max_timeout: Option<Duration>,
358        max_filter_num_per_query: usize,
359        prefer_short_incremental_cadence: bool,
360    ) -> Instant {
361        // = last query duration, capped by [max(min_run_interval, time_window_size), max_timeout], note at most `max_timeout`
362        let lower = time_window_size.unwrap_or(min_refresh_duration);
363        let next_duration = self.last_query_duration.max(lower);
364        let next_duration = if let Some(max_timeout) = max_timeout {
365            next_duration.min(max_timeout)
366        } else {
367            next_duration
368        };
369
370        if self
371            .pending_fenced_repair
372            .as_ref()
373            .is_some_and(|repair| !repair.pending_windows().is_empty())
374        {
375            debug!(
376                "Flow id = {}, active fenced repair still has pending windows, execute immediately",
377                flow_id,
378            );
379            return Instant::now();
380        }
381
382        let cur_dirty_window_size = self.dirty_time_windows.window_size();
383        // compute how much time range can be handled in one query
384        let max_query_update_range = (*time_window_size)
385            .unwrap_or_default()
386            .mul_f64(max_filter_num_per_query as f64);
387        // if dirty time range is more than one query can handle, execute immediately
388        // to faster clean up dirty time windows
389        if cur_dirty_window_size < max_query_update_range {
390            if prefer_short_incremental_cadence {
391                // Run incremental queries sooner than the normal time-window
392                // cadence, while still backing off by at least the previous
393                // query duration and respecting the max-timeout cap.
394                let next_duration = self.last_query_duration.max(min_refresh_duration);
395                let next_duration = if let Some(max_timeout) = max_timeout {
396                    next_duration.min(max_timeout)
397                } else {
398                    next_duration
399                };
400                self.last_update_time + next_duration
401            } else {
402                self.last_update_time + next_duration
403            }
404        } else {
405            // if dirty time windows can't be clean up in one query, execute immediately to faster
406            // clean up dirty time windows
407            debug!(
408                "Flow id = {}, still have too many {} dirty time window({:?}), execute immediately",
409                flow_id,
410                self.dirty_time_windows.windows.len(),
411                self.dirty_time_windows.windows
412            );
413            Instant::now()
414        }
415    }
416}
417
418/// For keep recording of dirty time windows, which is time window that have new data inserted
419/// since last query.
420#[derive(Debug, Clone)]
421pub struct DirtyTimeWindows {
422    /// windows's `start -> end` and non-overlapping
423    /// `end` is exclusive(and optional)
424    windows: BTreeMap<Timestamp, Option<Timestamp>>,
425    /// Maximum number of filters allowed in a single query
426    max_filter_num_per_query: usize,
427    /// Time window merge distance
428    ///
429    time_window_merge_threshold: usize,
430}
431
432impl DirtyTimeWindows {
433    pub fn new(max_filter_num_per_query: usize, time_window_merge_threshold: usize) -> Self {
434        Self {
435            windows: BTreeMap::new(),
436            max_filter_num_per_query,
437            time_window_merge_threshold,
438        }
439    }
440
441    #[cfg(test)]
442    pub(crate) fn max_filter_num_per_query(&self) -> usize {
443        self.max_filter_num_per_query
444    }
445
446    #[cfg(test)]
447    pub(crate) fn time_window_merge_threshold(&self) -> usize {
448        self.time_window_merge_threshold
449    }
450}
451
452impl Default for DirtyTimeWindows {
453    fn default() -> Self {
454        Self {
455            windows: BTreeMap::new(),
456            max_filter_num_per_query: 20,
457            time_window_merge_threshold: 3,
458        }
459    }
460}
461
462impl DirtyTimeWindows {
463    /// Time window merge distance
464    ///
465    /// TODO(discord9): make those configurable
466    pub const MERGE_DIST: i32 = 3;
467
468    /// Add lower bounds to the dirty time windows. Upper bounds are ignored.
469    ///
470    /// # Arguments
471    ///
472    /// * `lower_bounds` - An iterator of lower bounds to be added.
473    pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
474        for lower_bound in lower_bounds {
475            let entry = self.windows.entry(lower_bound);
476            entry.or_insert(None);
477        }
478    }
479
480    pub fn window_size(&self) -> Duration {
481        let mut ret = Duration::from_secs(0);
482        for (start, end) in &self.windows {
483            if let Some(end) = end
484                && let Some(duration) = end.sub(start)
485            {
486                ret += duration.to_std().unwrap_or_default();
487            }
488        }
489        ret
490    }
491
492    pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
493        self.add_or_merge_window(start, end);
494    }
495
496    pub fn add_windows(&mut self, time_ranges: Vec<(Timestamp, Timestamp)>) {
497        for (start, end) in time_ranges {
498            self.add_or_merge_window(start, Some(end));
499        }
500    }
501
502    /// Add all dirty markers from another dirty-window set.
503    pub fn add_dirty_windows(&mut self, dirty_windows: &DirtyTimeWindows) {
504        for (start, end) in &dirty_windows.windows {
505            self.add_or_merge_window(*start, *end);
506        }
507    }
508
509    fn add_or_merge_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
510        self.windows
511            .entry(start)
512            .and_modify(|current_end| {
513                *current_end = Self::union_window_end(*current_end, end);
514            })
515            .or_insert(end);
516    }
517
518    fn union_window_end(
519        current_end: Option<Timestamp>,
520        incoming_end: Option<Timestamp>,
521    ) -> Option<Timestamp> {
522        match (current_end, incoming_end) {
523            (Some(current), Some(incoming)) => Some(current.max(incoming)),
524            // `None` is a dirty marker without a known upper bound.  When one
525            // side has a concrete end, keep it so merging a restored snapshot
526            // never shrinks an already-known dirty range with the same start.
527            (Some(end), None) | (None, Some(end)) => Some(end),
528            (None, None) => None,
529        }
530    }
531
532    /// Clean all dirty time windows, useful when can't found time window expr
533    pub fn clean(&mut self) {
534        self.windows.clear();
535    }
536
537    /// Set windows to be dirty, only useful for full aggr without time window
538    /// to mark some new data is inserted
539    pub fn set_dirty(&mut self) {
540        self.add_or_merge_window(Timestamp::new_second(0), None);
541    }
542
543    /// Number of dirty windows.
544    pub fn len(&self) -> usize {
545        self.windows.len()
546    }
547
548    pub fn is_empty(&self) -> bool {
549        self.windows.is_empty()
550    }
551
552    /// Get the effective count of time windows, which is the number of time windows that can be
553    /// used for query, compute from total time window range divided by `window_size`.
554    pub fn effective_count(&self, window_size: &Duration) -> usize {
555        if self.windows.is_empty() {
556            return 0;
557        }
558        let window_size =
559            chrono::Duration::from_std(*window_size).unwrap_or(chrono::Duration::zero());
560        let total_window_time_range =
561            self.windows
562                .iter()
563                .fold(chrono::Duration::zero(), |acc, (start, end)| {
564                    if let Some(end) = end {
565                        acc + end.sub(start).unwrap_or(chrono::Duration::zero())
566                    } else {
567                        acc + window_size
568                    }
569                });
570
571        // not sure window_size is zero have any meaning, but just in case
572        if window_size.num_seconds() == 0 {
573            0
574        } else {
575            (total_window_time_range.num_seconds() / window_size.num_seconds()) as usize
576        }
577    }
578
579    /// Generate all filter expressions consuming all time windows
580    ///
581    /// there is two limits:
582    /// - shouldn't return a too long time range(<=`window_size * window_cnt`), so that the query can be executed in a reasonable time
583    /// - shouldn't return too many time range exprs, so that the query can be parsed properly instead of causing parser to overflow
584    pub fn gen_filter_exprs(
585        &mut self,
586        col_name: &str,
587        expire_lower_bound: Option<Timestamp>,
588        window_size: chrono::Duration,
589        window_cnt: usize,
590        flow_id: FlowId,
591        task_ctx: Option<&BatchingTask>,
592    ) -> Result<Option<FilterExprInfo>, Error> {
593        ensure!(
594            window_size.num_seconds() > 0,
595            UnexpectedSnafu {
596                reason: "window_size is zero, can't generate filter exprs",
597            }
598        );
599
600        debug!(
601            "expire_lower_bound: {:?}, window_size: {:?}",
602            expire_lower_bound.map(|t| t.to_iso8601_string()),
603            window_size
604        );
605        self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
606
607        if self.windows.len() > window_cnt {
608            let first_time_window = self.windows.first_key_value();
609            let last_time_window = self.windows.last_key_value();
610
611            if let Some(task_ctx) = task_ctx {
612                warn!(
613                    "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
614                    task_ctx.config.flow_id,
615                    self.windows.len(),
616                    window_cnt,
617                    task_ctx.config.time_window_expr,
618                    task_ctx.config.expire_after,
619                    first_time_window,
620                    last_time_window,
621                    task_ctx.config.query
622                );
623            } else {
624                warn!(
625                    "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}",
626                    flow_id,
627                    self.windows.len(),
628                    window_cnt,
629                    first_time_window,
630                    last_time_window
631                )
632            }
633        }
634
635        // get the first `window_cnt` time windows
636        let max_time_range = window_size * window_cnt as i32;
637
638        let mut to_be_query = BTreeMap::new();
639        let mut new_windows = self.windows.clone();
640        let mut cur_time_range = chrono::Duration::zero();
641        for (idx, (start, end)) in self.windows.iter().enumerate() {
642            let first_end = start
643                .add_duration(window_size.to_std().unwrap())
644                .context(TimeSnafu)?;
645            let end = end.unwrap_or(first_end);
646
647            // if time range is too long, stop
648            if cur_time_range >= max_time_range {
649                break;
650            }
651
652            // if we have enough time windows, stop
653            if idx >= window_cnt {
654                break;
655            }
656
657            let Some(x) = end.sub(start) else {
658                continue;
659            };
660            if cur_time_range + x <= max_time_range {
661                to_be_query.insert(*start, Some(end));
662                new_windows.remove(start);
663                cur_time_range += x;
664            } else {
665                // too large a window, split it
666                // split at window_size * times
667                let surplus = max_time_range - cur_time_range;
668                if surplus.num_seconds() <= window_size.num_seconds() {
669                    // Skip splitting if surplus is smaller than window_size
670                    break;
671                }
672                let times = surplus.num_seconds() / window_size.num_seconds();
673
674                let split_offset = window_size * times as i32;
675                let split_at = start
676                    .add_duration(split_offset.to_std().unwrap())
677                    .context(TimeSnafu)?;
678                to_be_query.insert(*start, Some(split_at));
679
680                // remove the original window
681                new_windows.remove(start);
682                new_windows.insert(split_at, Some(end));
683                cur_time_range += split_offset;
684                break;
685            }
686        }
687
688        self.windows = new_windows;
689
690        METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
691            .with_label_values(&[flow_id.to_string().as_str()])
692            .observe(to_be_query.len() as f64);
693
694        let full_time_range = to_be_query
695            .iter()
696            .fold(chrono::Duration::zero(), |acc, (start, end)| {
697                if let Some(end) = end {
698                    acc + end.sub(start).unwrap_or(chrono::Duration::zero())
699                } else {
700                    acc + window_size
701                }
702            })
703            .num_seconds() as f64;
704        METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE
705            .with_label_values(&[flow_id.to_string().as_str()])
706            .observe(full_time_range);
707
708        let stalled_time_range =
709            self.windows
710                .iter()
711                .fold(chrono::Duration::zero(), |acc, (start, end)| {
712                    if let Some(end) = end {
713                        acc + end.sub(start).unwrap_or(chrono::Duration::zero())
714                    } else {
715                        acc + window_size
716                    }
717                });
718
719        METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE
720            .with_label_values(&[flow_id.to_string().as_str()])
721            .observe(stalled_time_range.num_seconds() as f64);
722
723        let std_window_size = window_size.to_std().map_err(|e| {
724            InternalSnafu {
725                reason: e.to_string(),
726            }
727            .build()
728        })?;
729
730        let mut expr_lst = vec![];
731        let mut time_ranges = vec![];
732        for (start, end) in to_be_query.into_iter() {
733            // align using time window exprs
734            let (start, end) = if let Some(ctx) = task_ctx {
735                let Some(time_window_expr) = &ctx.config.time_window_expr else {
736                    UnexpectedSnafu {
737                        reason: "time_window_expr is not set",
738                    }
739                    .fail()?
740                };
741                self.align_time_window(start, end, time_window_expr)?
742            } else {
743                (start, end)
744            };
745            let end = end.unwrap_or(start.add_duration(std_window_size).context(TimeSnafu)?);
746            time_ranges.push((start, end));
747
748            debug!(
749                "Time window start: {:?}, end: {:?}",
750                start.to_iso8601_string(),
751                end.to_iso8601_string()
752            );
753
754            use datafusion_expr::{col, lit};
755            let lower = to_df_literal(start)?;
756            let upper = to_df_literal(end)?;
757            let expr = col(col_name)
758                .gt_eq(lit(lower))
759                .and(col(col_name).lt(lit(upper)));
760            expr_lst.push(expr);
761        }
762        let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
763        let ret = expr.map(|expr| FilterExprInfo {
764            expr,
765            col_name: col_name.to_string(),
766            time_ranges,
767            window_size,
768        });
769        Ok(ret)
770    }
771
772    fn align_time_window(
773        &self,
774        start: Timestamp,
775        end: Option<Timestamp>,
776        time_window_expr: &TimeWindowExpr,
777    ) -> Result<(Timestamp, Option<Timestamp>), Error> {
778        let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
779            reason: format!(
780                "Failed to align start time {:?} with time window expr {:?}",
781                start, time_window_expr
782            ),
783        })?;
784        let align_end = end
785            .and_then(|end| {
786                time_window_expr
787                    .eval(end)
788                    // if after aligned, end is the same, then use end(because it's already aligned) else use aligned end
789                    .map(|r| if r.0 == Some(end) { r.0 } else { r.1 })
790                    .transpose()
791            })
792            .transpose()?;
793        Ok((align_start, align_end))
794    }
795
796    /// Merge time windows that overlaps or get too close
797    ///
798    /// TODO(discord9): not merge and prefer to send smaller time windows? how?
799    pub fn merge_dirty_time_windows(
800        &mut self,
801        window_size: chrono::Duration,
802        expire_lower_bound: Option<Timestamp>,
803    ) -> Result<(), Error> {
804        if self.windows.is_empty() {
805            return Ok(());
806        }
807
808        let mut new_windows = BTreeMap::new();
809
810        let std_window_size = window_size.to_std().map_err(|e| {
811            InternalSnafu {
812                reason: e.to_string(),
813            }
814            .build()
815        })?;
816
817        // previous time window
818        let mut prev_tw = None;
819        for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
820            // filter out expired time window
821            if let Some(expire_lower_bound) = expire_lower_bound
822                && lower_bound < expire_lower_bound
823            {
824                continue;
825            }
826
827            let Some(prev_tw) = &mut prev_tw else {
828                prev_tw = Some((lower_bound, upper_bound));
829                continue;
830            };
831
832            // if cur.lower - prev.upper <= window_size * MERGE_DIST, merge
833            // this also deal with overlap windows because cur.lower > prev.lower is always true
834            let prev_upper = prev_tw
835                .1
836                .unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
837            prev_tw.1 = Some(prev_upper);
838
839            let cur_upper = upper_bound.unwrap_or(
840                lower_bound
841                    .add_duration(std_window_size)
842                    .context(TimeSnafu)?,
843            );
844
845            if lower_bound
846                .sub(&prev_upper)
847                .map(|dist| dist <= window_size * self.time_window_merge_threshold as i32)
848                .unwrap_or(false)
849            {
850                prev_tw.1 = Some(cur_upper);
851            } else {
852                new_windows.insert(prev_tw.0, prev_tw.1);
853                *prev_tw = (lower_bound, Some(cur_upper));
854            }
855        }
856
857        if let Some(prev_tw) = prev_tw {
858            new_windows.insert(prev_tw.0, prev_tw.1);
859        }
860
861        self.windows = new_windows;
862
863        Ok(())
864    }
865}
866
867pub(crate) fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
868    let value = Value::from(value);
869    let value = value
870        .try_to_scalar_value(&value.data_type())
871        .with_context(|_| DatatypesSnafu {
872            extra: format!("Failed to convert to scalar value: {}", value),
873        })?;
874    Ok(value)
875}
876
877#[derive(Debug, Clone)]
878enum ExecState {
879    Idle,
880    Executing,
881}
882
883#[derive(Debug, Clone, Copy, PartialEq, Eq)]
884pub enum CheckpointMode {
885    FullSnapshot,
886    Incremental,
887}
888
889/// Dirty windows that must be repaired under a frozen full-snapshot watermark.
890/// This is a FullSnapshot sub-state, not a separate checkpoint mode.
891#[derive(Debug, Clone)]
892pub struct FencedRepair {
893    high: BTreeMap<u64, u64>,
894    pending_windows: DirtyTimeWindows,
895}
896
897impl FencedRepair {
898    /// Frozen high watermark `H` used as the snapshot upper bound for chunks.
899    pub fn high(&self) -> &BTreeMap<u64, u64> {
900        &self.high
901    }
902
903    /// Dirty windows still waiting to be repaired under `high`.
904    pub fn pending_windows(&self) -> &DirtyTimeWindows {
905        &self.pending_windows
906    }
907}
908
909/// Filter Expression's information
910#[derive(Debug, Clone)]
911pub struct FilterExprInfo {
912    pub expr: datafusion_expr::Expr,
913    pub col_name: String,
914    pub time_ranges: Vec<(Timestamp, Timestamp)>,
915    pub window_size: chrono::Duration,
916}
917
918impl FilterExprInfo {
919    pub fn total_window_length(&self) -> chrono::Duration {
920        self.time_ranges
921            .iter()
922            .fold(chrono::Duration::zero(), |acc, (start, end)| {
923                acc + end.sub(start).unwrap_or(chrono::Duration::zero())
924            })
925    }
926
927    pub fn predicate_for_col(
928        &self,
929        col_name: &str,
930    ) -> Result<Option<datafusion_expr::Expr>, Error> {
931        use datafusion_common::Column;
932        use datafusion_expr::{Expr, lit};
933
934        let mut expr_lst = Vec::with_capacity(self.time_ranges.len());
935        for (start, end) in &self.time_ranges {
936            let lower = to_df_literal(*start)?;
937            let upper = to_df_literal(*end)?;
938            let filter_col = || Expr::Column(Column::new_unqualified(col_name));
939            expr_lst.push(
940                filter_col()
941                    .gt_eq(lit(lower))
942                    .and(filter_col().lt(lit(upper))),
943            );
944        }
945
946        Ok(expr_lst.into_iter().reduce(|a, b| a.or(b)))
947    }
948}
949
950#[cfg(test)]
951mod test {
952    use pretty_assertions::assert_eq;
953    use session::context::QueryContext;
954
955    use super::*;
956    use crate::batching_mode::time_window::find_time_window_expr;
957    use crate::batching_mode::utils::sql_to_df_plan;
958    use crate::test_utils::create_test_query_engine;
959
960    #[test]
961    fn test_task_state_records_last_execution_time() {
962        let query_ctx = QueryContext::arc();
963        let (_tx, rx) = tokio::sync::oneshot::channel();
964        let mut state = TaskState::new(query_ctx, rx);
965
966        assert_eq!(None, state.last_execution_time_millis());
967        state.after_query_exec(std::time::Duration::from_millis(1), false);
968        assert_eq!(None, state.last_execution_time_millis());
969
970        state.after_query_exec(std::time::Duration::from_millis(1), true);
971        assert!(state.last_execution_time_millis().is_some());
972    }
973
974    #[test]
975    fn test_merge_dirty_time_windows() {
976        let merge_dist = DirtyTimeWindows::default().time_window_merge_threshold;
977        let testcases = vec![
978            // just enough to merge
979            (
980                vec![
981                    Timestamp::new_second(0),
982                    Timestamp::new_second((1 + merge_dist as i64) * 5 * 60),
983                ],
984                (chrono::Duration::seconds(5 * 60), None),
985                BTreeMap::from([(
986                    Timestamp::new_second(0),
987                    Some(Timestamp::new_second((2 + merge_dist as i64) * 5 * 60)),
988                )]),
989                Some(
990                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:25:00' AS TIMESTAMP)))",
991                ),
992            ),
993            // separate time window
994            (
995                vec![
996                    Timestamp::new_second(0),
997                    Timestamp::new_second((2 + merge_dist as i64) * 5 * 60),
998                ],
999                (chrono::Duration::seconds(5 * 60), None),
1000                BTreeMap::from([
1001                    (
1002                        Timestamp::new_second(0),
1003                        Some(Timestamp::new_second(5 * 60)),
1004                    ),
1005                    (
1006                        Timestamp::new_second((2 + merge_dist as i64) * 5 * 60),
1007                        Some(Timestamp::new_second((3 + merge_dist as i64) * 5 * 60)),
1008                    ),
1009                ]),
1010                Some(
1011                    "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:05:00' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:25:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:30:00' AS TIMESTAMP))))",
1012                ),
1013            ),
1014            // overlapping
1015            (
1016                vec![
1017                    Timestamp::new_second(0),
1018                    Timestamp::new_second((merge_dist as i64) * 5 * 60),
1019                ],
1020                (chrono::Duration::seconds(5 * 60), None),
1021                BTreeMap::from([(
1022                    Timestamp::new_second(0),
1023                    Some(Timestamp::new_second((1 + merge_dist as i64) * 5 * 60)),
1024                )]),
1025                Some(
1026                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:20:00' AS TIMESTAMP)))",
1027                ),
1028            ),
1029            // complex overlapping
1030            (
1031                vec![
1032                    Timestamp::new_second(0),
1033                    Timestamp::new_second((merge_dist as i64) * 3),
1034                    Timestamp::new_second((merge_dist as i64) * 3 * 2),
1035                ],
1036                (chrono::Duration::seconds(3), None),
1037                BTreeMap::from([(
1038                    Timestamp::new_second(0),
1039                    Some(Timestamp::new_second((merge_dist as i64) * 7)),
1040                )]),
1041                Some(
1042                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
1043                ),
1044            ),
1045            // split range
1046            (
1047                Vec::from_iter((0..20).map(|i| Timestamp::new_second(i * 3)).chain(
1048                    std::iter::once(Timestamp::new_second(
1049                        60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1),
1050                    )),
1051                )),
1052                (chrono::Duration::seconds(3), None),
1053                BTreeMap::from([
1054                    (Timestamp::new_second(0), Some(Timestamp::new_second(60))),
1055                    (
1056                        Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
1057                        Some(Timestamp::new_second(
1058                            60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3,
1059                        )),
1060                    ),
1061                ]),
1062                Some(
1063                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
1064                ),
1065            ),
1066            // split 2 min into 1 min
1067            (
1068                Vec::from_iter((0..40).map(|i| Timestamp::new_second(i * 3))),
1069                (chrono::Duration::seconds(3), None),
1070                BTreeMap::from([(
1071                    Timestamp::new_second(0),
1072                    Some(Timestamp::new_second(40 * 3)),
1073                )]),
1074                Some(
1075                    "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
1076                ),
1077            ),
1078            // split 3s + 1min into 3s + 57s
1079            (
1080                Vec::from_iter(
1081                    std::iter::once(Timestamp::new_second(0))
1082                        .chain((0..40).map(|i| Timestamp::new_second(20 + i * 3))),
1083                ),
1084                (chrono::Duration::seconds(3), None),
1085                BTreeMap::from([
1086                    (Timestamp::new_second(0), Some(Timestamp::new_second(3))),
1087                    (Timestamp::new_second(20), Some(Timestamp::new_second(140))),
1088                ]),
1089                Some(
1090                    "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
1091                ),
1092            ),
1093            // expired
1094            (
1095                vec![
1096                    Timestamp::new_second(0),
1097                    Timestamp::new_second((merge_dist as i64) * 5 * 60),
1098                ],
1099                (
1100                    chrono::Duration::seconds(5 * 60),
1101                    Some(Timestamp::new_second((merge_dist as i64) * 6 * 60)),
1102                ),
1103                BTreeMap::from([]),
1104                None,
1105            ),
1106        ];
1107        // let len = testcases.len();
1108        // let testcases = testcases[(len - 2)..(len - 1)].to_vec();
1109        for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
1110            testcases
1111        {
1112            let mut dirty = DirtyTimeWindows::default();
1113            dirty.add_lower_bounds(lower_bounds.into_iter());
1114            dirty
1115                .merge_dirty_time_windows(window_size, expire_lower_bound)
1116                .unwrap();
1117            assert_eq!(expected, dirty.windows);
1118            let filter_expr = dirty
1119                .gen_filter_exprs(
1120                    "ts",
1121                    expire_lower_bound,
1122                    window_size,
1123                    dirty.max_filter_num_per_query,
1124                    0,
1125                    None,
1126                )
1127                .unwrap()
1128                .map(|e| e.expr);
1129
1130            let unparser = datafusion::sql::unparser::Unparser::default();
1131            let to_sql = filter_expr
1132                .as_ref()
1133                .map(|e| unparser.expr_to_sql(e).unwrap().to_string());
1134            assert_eq!(expected_filter_expr, to_sql.as_deref());
1135        }
1136    }
1137
1138    #[tokio::test]
1139    async fn test_align_time_window() {
1140        type TimeWindow = (Timestamp, Option<Timestamp>);
1141        struct TestCase {
1142            sql: String,
1143            aligns: Vec<(TimeWindow, TimeWindow)>,
1144        }
1145        let testcases: Vec<TestCase> = vec![TestCase{
1146            sql: "SELECT date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window;".to_string(),
1147            aligns: vec![
1148                ((Timestamp::new_second(3), None), (Timestamp::new_second(0), None)),
1149                ((Timestamp::new_second(8), None), (Timestamp::new_second(5), None)),
1150                ((Timestamp::new_second(8), Some(Timestamp::new_second(10))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
1151                ((Timestamp::new_second(8), Some(Timestamp::new_second(9))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
1152            ],
1153        }];
1154
1155        let query_engine = create_test_query_engine();
1156        let ctx = QueryContext::arc();
1157        for TestCase { sql, aligns } in testcases {
1158            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, true)
1159                .await
1160                .unwrap();
1161
1162            let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
1163                &plan,
1164                query_engine.engine_state().catalog_manager().clone(),
1165                ctx.clone(),
1166            )
1167            .await
1168            .unwrap();
1169
1170            let time_window_expr = time_window_expr
1171                .map(|expr| {
1172                    TimeWindowExpr::from_expr(
1173                        &expr,
1174                        &column_name,
1175                        &df_schema,
1176                        &query_engine.engine_state().session_state(),
1177                    )
1178                })
1179                .transpose()
1180                .unwrap()
1181                .unwrap();
1182
1183            let dirty = DirtyTimeWindows::default();
1184            for (before_align, expected_after_align) in aligns {
1185                let after_align = dirty
1186                    .align_time_window(before_align.0, before_align.1, &time_window_expr)
1187                    .unwrap();
1188                assert_eq!(expected_after_align, after_align);
1189            }
1190        }
1191    }
1192
1193    #[test]
1194    fn test_task_state_checkpoint_mode_and_advancement() {
1195        let query_ctx = QueryContext::arc();
1196        let (_tx, rx) = tokio::sync::oneshot::channel();
1197        let mut state = TaskState::new(query_ctx, rx);
1198
1199        assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
1200        assert!(state.checkpoints().is_empty());
1201
1202        state.advance_checkpoints(HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]));
1203        assert_eq!(state.checkpoint_mode(), CheckpointMode::Incremental);
1204        assert_eq!(
1205            state.checkpoints(),
1206            &BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])
1207        );
1208
1209        state.mark_full_snapshot();
1210        assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
1211        assert_eq!(
1212            state.checkpoints(),
1213            &BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])
1214        );
1215    }
1216
1217    #[test]
1218    fn test_mark_full_snapshot_restores_pending_fenced_repair_windows() {
1219        let query_ctx = QueryContext::arc();
1220        let (_tx, rx) = tokio::sync::oneshot::channel();
1221        let mut state = TaskState::new(query_ctx, rx);
1222        state
1223            .dirty_time_windows
1224            .add_window(Timestamp::new_second(10), Some(Timestamp::new_second(15)));
1225        state
1226            .dirty_time_windows
1227            .add_window(Timestamp::new_second(100), Some(Timestamp::new_second(105)));
1228
1229        state
1230            .start_fenced_repair(BTreeMap::from([(1_u64, 10_u64)]))
1231            .unwrap();
1232        assert!(state.dirty_time_windows.is_empty());
1233        assert_eq!(
1234            state
1235                .pending_fenced_repair()
1236                .unwrap()
1237                .pending_windows()
1238                .len(),
1239            2
1240        );
1241
1242        state.mark_full_snapshot();
1243
1244        assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
1245        assert!(state.pending_fenced_repair().is_none());
1246        assert_eq!(state.dirty_time_windows.len(), 2);
1247    }
1248
1249    #[test]
1250    fn test_disable_incremental_persists_full_snapshot_mode() {
1251        let query_ctx = QueryContext::arc();
1252        let (_tx, rx) = tokio::sync::oneshot::channel();
1253        let mut state = TaskState::new(query_ctx, rx);
1254
1255        assert!(!state.is_incremental_disabled());
1256
1257        // After disable, mode becomes FullSnapshot and flag is set.
1258        state.disable_incremental();
1259        assert!(state.is_incremental_disabled());
1260        assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
1261
1262        // `advance_checkpoints` will NOT transition to Incremental when disabled.
1263        state.advance_checkpoints(HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]));
1264        assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
1265        assert_eq!(
1266            state.checkpoints(),
1267            &BTreeMap::from([(1_u64, 10_u64), (2_u64, 20_u64)])
1268        );
1269
1270        // `mark_full_snapshot` does not re-enable incremental.
1271        state.mark_full_snapshot();
1272        assert!(state.is_incremental_disabled());
1273        assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
1274    }
1275
1276    #[test]
1277    fn test_full_snapshot_checkpoint_advancement_requires_participating_regions() {
1278        let query_ctx = QueryContext::arc();
1279        let (_tx, rx) = tokio::sync::oneshot::channel();
1280        let state = TaskState::new(query_ctx, rx);
1281
1282        assert!(!state.can_advance_full_snapshot_checkpoints(&BTreeSet::new(), &HashMap::new()));
1283        assert!(!state.can_advance_full_snapshot_checkpoints(
1284            &BTreeSet::from([1_u64, 2_u64]),
1285            &HashMap::from([(1_u64, 10_u64)]),
1286        ));
1287        assert!(state.can_advance_full_snapshot_checkpoints(
1288            &BTreeSet::from([1_u64, 2_u64]),
1289            &HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]),
1290        ));
1291    }
1292
1293    #[test]
1294    fn test_incremental_checkpoint_advancement_requires_participation_alignment() {
1295        let query_ctx = QueryContext::arc();
1296        let (_tx, rx) = tokio::sync::oneshot::channel();
1297        let mut state = TaskState::new(query_ctx, rx);
1298        state.advance_checkpoints(HashMap::from([(1_u64, 10_u64), (2_u64, 20_u64)]));
1299
1300        assert!(
1301            state.can_advance_incremental_checkpoints_with_participation(
1302                &BTreeSet::from([1_u64]),
1303                &HashMap::from([(1_u64, 11_u64)]),
1304            )
1305        );
1306        assert!(
1307            !state.can_advance_incremental_checkpoints_with_participation(
1308                &BTreeSet::from([1_u64, 2_u64]),
1309                &HashMap::from([(1_u64, 11_u64)]),
1310            )
1311        );
1312        assert!(
1313            !state.can_advance_incremental_checkpoints_with_participation(
1314                &BTreeSet::from([3_u64]),
1315                &HashMap::from([(3_u64, 11_u64)]),
1316            )
1317        );
1318        assert!(
1319            !state.can_advance_incremental_checkpoints_with_participation(
1320                &BTreeSet::from([1_u64]),
1321                &HashMap::from([(1_u64, 9_u64)]),
1322            )
1323        );
1324        assert!(
1325            state.can_advance_incremental_checkpoints_with_participation(
1326                &BTreeSet::from([1_u64, 2_u64]),
1327                &HashMap::from([(1_u64, 11_u64), (2_u64, 21_u64)]),
1328            )
1329        );
1330
1331        state.disable_incremental();
1332        assert!(
1333            !state.can_advance_incremental_checkpoints_with_participation(
1334                &BTreeSet::from([1_u64, 2_u64]),
1335                &HashMap::from([(1_u64, 12_u64), (2_u64, 22_u64)]),
1336            )
1337        );
1338    }
1339
1340    #[test]
1341    fn test_incremental_checkpoint_advancement_merges_participating_subset() {
1342        let query_ctx = QueryContext::arc();
1343        let (_tx, rx) = tokio::sync::oneshot::channel();
1344        let mut state = TaskState::new(query_ctx, rx);
1345        state.advance_checkpoints(HashMap::from([
1346            (1_u64, 10_u64),
1347            (2_u64, 20_u64),
1348            (3_u64, 30_u64),
1349        ]));
1350
1351        state.advance_incremental_checkpoints_with_participation(
1352            &BTreeSet::from([1_u64, 3_u64]),
1353            HashMap::from([(1_u64, 12_u64), (3_u64, 35_u64)]),
1354        );
1355
1356        assert_eq!(state.checkpoint_mode(), CheckpointMode::Incremental);
1357        assert_eq!(
1358            state.checkpoints(),
1359            &BTreeMap::from([(1_u64, 12_u64), (2_u64, 20_u64), (3_u64, 35_u64)])
1360        );
1361    }
1362
1363    #[test]
1364    fn test_filter_expr_info_predicate_for_col_empty_ranges() {
1365        let filter = FilterExprInfo {
1366            expr: datafusion_expr::col("ts"),
1367            col_name: "ts".to_string(),
1368            time_ranges: vec![],
1369            window_size: chrono::Duration::seconds(1),
1370        };
1371
1372        assert!(filter.predicate_for_col("time_window").unwrap().is_none());
1373    }
1374
1375    #[test]
1376    fn test_filter_expr_info_predicate_for_col_single_range() {
1377        let filter = FilterExprInfo {
1378            expr: datafusion_expr::col("ts"),
1379            col_name: "ts".to_string(),
1380            time_ranges: vec![(Timestamp::new_second(0), Timestamp::new_second(1))],
1381            window_size: chrono::Duration::seconds(1),
1382        };
1383
1384        let predicate = filter.predicate_for_col("time_window").unwrap().unwrap();
1385        let unparser = datafusion::sql::unparser::Unparser::default();
1386        assert_eq!(
1387            "((time_window >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (time_window < CAST('1970-01-01 00:00:01' AS TIMESTAMP)))",
1388            unparser.expr_to_sql(&predicate).unwrap().to_string()
1389        );
1390    }
1391
1392    #[test]
1393    fn test_filter_expr_info_predicate_for_col_multiple_ranges() {
1394        let filter = FilterExprInfo {
1395            expr: datafusion_expr::col("ts"),
1396            col_name: "ts".to_string(),
1397            time_ranges: vec![
1398                (Timestamp::new_second(0), Timestamp::new_second(1)),
1399                (Timestamp::new_second(10), Timestamp::new_second(11)),
1400            ],
1401            window_size: chrono::Duration::seconds(1),
1402        };
1403
1404        let predicate = filter.predicate_for_col("time_window").unwrap().unwrap();
1405        let unparser = datafusion::sql::unparser::Unparser::default();
1406        assert_eq!(
1407            "(((time_window >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (time_window < CAST('1970-01-01 00:00:01' AS TIMESTAMP))) OR ((time_window >= CAST('1970-01-01 00:00:10' AS TIMESTAMP)) AND (time_window < CAST('1970-01-01 00:00:11' AS TIMESTAMP))))",
1408            unparser.expr_to_sql(&predicate).unwrap().to_string()
1409        );
1410    }
1411
1412    /// Helper: create a `TaskState` whose `last_update_time` is a known duration in the past.
1413    fn state_with_past_update(age: Duration) -> TaskState {
1414        let query_ctx = QueryContext::arc();
1415        let (_tx, rx) = tokio::sync::oneshot::channel();
1416        let mut state = TaskState::new(query_ctx, rx);
1417        state.last_update_time = Instant::now() - age;
1418        state
1419    }
1420
1421    #[test]
1422    fn test_short_incremental_cadence_uses_min_refresh() {
1423        // When prefer_short_incremental_cadence is true and dirty backlog is manageable,
1424        // the next start time should be last_update_time + min_refresh (short cadence),
1425        // ignoring the longer time_window_size.
1426        let state = state_with_past_update(Duration::from_secs(10));
1427
1428        let time_window_size = Some(Duration::from_secs(60)); // large window
1429        let min_refresh = Duration::from_secs(5);
1430        let flow_id = 1;
1431
1432        let result = state.get_next_start_query_time(
1433            flow_id,
1434            &time_window_size,
1435            min_refresh,
1436            None,
1437            20,
1438            true, // prefer_short_incremental_cadence
1439        );
1440
1441        // With short cadence, result should be last_update_time + min_refresh.
1442        let expected = state.last_update_time + min_refresh;
1443        assert_eq!(result, expected);
1444    }
1445
1446    #[test]
1447    fn test_short_incremental_cadence_respects_last_query_duration() {
1448        let mut state = state_with_past_update(Duration::from_secs(10));
1449        state.last_query_duration = Duration::from_secs(20);
1450
1451        let time_window_size = Some(Duration::from_secs(60));
1452        let min_refresh = Duration::from_secs(5);
1453        let flow_id = 1;
1454
1455        let result = state.get_next_start_query_time(
1456            flow_id,
1457            &time_window_size,
1458            min_refresh,
1459            None,
1460            20,
1461            true,
1462        );
1463
1464        assert_eq!(result, state.last_update_time + state.last_query_duration);
1465    }
1466
1467    #[test]
1468    fn test_short_incremental_cadence_respects_max_timeout() {
1469        let mut state = state_with_past_update(Duration::from_secs(10));
1470        state.last_query_duration = Duration::from_secs(20);
1471
1472        let time_window_size = Some(Duration::from_secs(60));
1473        let min_refresh = Duration::from_secs(30);
1474        let max_timeout = Duration::from_secs(5);
1475        let flow_id = 1;
1476
1477        let result = state.get_next_start_query_time(
1478            flow_id,
1479            &time_window_size,
1480            min_refresh,
1481            Some(max_timeout),
1482            20,
1483            true,
1484        );
1485
1486        assert_eq!(result, state.last_update_time + max_timeout);
1487    }
1488
1489    #[test]
1490    fn test_full_snapshot_ignores_short_cadence() {
1491        // When prefer_short_incremental_cadence is false (full snapshot mode),
1492        // the normal long-cadence based on time_window_size applies.
1493        let mut state = state_with_past_update(Duration::from_secs(10));
1494        // Make last_query_duration small so the lower bound (time_window_size) dominates.
1495        state.last_query_duration = Duration::from_secs(1);
1496
1497        let time_window_size = Some(Duration::from_secs(60)); // large window
1498        let min_refresh = Duration::from_secs(5);
1499        let flow_id = 1;
1500
1501        let result = state.get_next_start_query_time(
1502            flow_id,
1503            &time_window_size,
1504            min_refresh,
1505            None,
1506            20,
1507            false, // prefer_short_incremental_cadence = false
1508        );
1509
1510        // With normal cadence, result should be last_update_time + time_window_size
1511        // (since last_query_duration < time_window_size).
1512        let expected = state.last_update_time + Duration::from_secs(60);
1513        assert_eq!(result, expected);
1514    }
1515
1516    #[test]
1517    fn test_dirty_window_overflow_schedules_immediately_even_with_short_cadence() {
1518        // Dirty-window overflow must always schedule immediately,
1519        // regardless of prefer_short_incremental_cadence.
1520        let mut state = state_with_past_update(Duration::from_secs(10));
1521        // Create a very large dirty backlog.
1522        state
1523            .dirty_time_windows
1524            .add_window(Timestamp::new_second(0), Some(Timestamp::new_second(3600)));
1525
1526        let time_window_size = Some(Duration::from_secs(1)); // tiny window => overflow
1527        let min_refresh = Duration::from_secs(5);
1528        let flow_id = 1;
1529
1530        // With short cadence flag.
1531        let result = state.get_next_start_query_time(
1532            flow_id,
1533            &time_window_size,
1534            min_refresh,
1535            None,
1536            1, // max 1 filter => tiny capacity
1537            true,
1538        );
1539        assert!(
1540            result <= Instant::now(),
1541            "dirty overflow should schedule immediately"
1542        );
1543
1544        // Without short cadence flag — same behavior.
1545        let result2 = state.get_next_start_query_time(
1546            flow_id,
1547            &time_window_size,
1548            min_refresh,
1549            None,
1550            1,
1551            false,
1552        );
1553        assert!(
1554            result2 <= Instant::now(),
1555            "dirty overflow should schedule immediately"
1556        );
1557    }
1558
1559    #[test]
1560    fn test_pending_fenced_repair_schedules_immediately() {
1561        let mut state = state_with_past_update(Duration::from_secs(10));
1562        state
1563            .dirty_time_windows
1564            .add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5)));
1565        state
1566            .start_fenced_repair(BTreeMap::from([(1_u64, 10_u64)]))
1567            .unwrap();
1568        assert!(state.dirty_time_windows.is_empty());
1569        assert!(!state.fenced_repair_pending_is_empty());
1570
1571        let result = state.get_next_start_query_time(
1572            1,
1573            &Some(Duration::from_secs(60)),
1574            Duration::from_secs(5),
1575            None,
1576            20,
1577            false,
1578        );
1579
1580        assert!(
1581            result <= Instant::now(),
1582            "pending fenced repair backlog should schedule immediately"
1583        );
1584    }
1585
1586    #[test]
1587    fn test_incremental_disabled_ignores_short_cadence() {
1588        // When prefer_short_incremental_cadence is true but the dirty backlog is
1589        // manageable, the short cadence is applied. This test verifies that the
1590        // caller-side guard (checkpoint_mode + !is_incremental_disabled) controls
1591        // whether short cadence is requested at all — when incremental is disabled,
1592        // the flag is false, and the long cadence applies.
1593        //
1594        // This simulates the case where the caller computed
1595        // prefer_short_incremental_cadence = false (e.g. incremental disabled
1596        // or FullSnapshot mode), so the long cadence is used.
1597        let mut state = state_with_past_update(Duration::from_secs(10));
1598        state.last_query_duration = Duration::from_secs(1);
1599
1600        let time_window_size = Some(Duration::from_secs(60));
1601        let min_refresh = Duration::from_secs(5);
1602        let flow_id = 1;
1603
1604        let result = state.get_next_start_query_time(
1605            flow_id,
1606            &time_window_size,
1607            min_refresh,
1608            None,
1609            20,
1610            false, // prefer_short_incremental_cadence = false
1611        );
1612
1613        // With normal cadence, result should be last_update_time + time_window_size.
1614        let expected = state.last_update_time + Duration::from_secs(60);
1615        assert_eq!(result, expected);
1616    }
1617}