Skip to main content

flow/batching_mode/
eval_schedule.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Helpers for stable `EVAL INTERVAL` scheduled times.
16
17pub use common_meta::key::flow::flow_info::{FlowMissedTickPolicy, FlowScheduleConfig};
18use snafu::ensure;
19
20use crate::error::{InvalidQuerySnafu, Result};
21
22/// Schedule for an `EVAL INTERVAL` flow.
23#[derive(Debug, Clone, PartialEq)]
24pub struct EvalSchedule {
25    /// Interval between scheduled times in seconds.
26    pub interval_secs: i64,
27    /// Anchor timestamp as seconds since Unix epoch.
28    pub anchor_secs: i64,
29    /// First scheduled time as seconds since Unix epoch.
30    pub start_secs: i64,
31    /// Policy for handling missed scheduled times.
32    pub missed_tick_policy: FlowMissedTickPolicy,
33    /// Maximum number of due scheduled times to catch up.
34    pub max_runs: u32,
35    /// Maximum age of a due scheduled time to keep for catch-up.
36    pub max_lag_secs: i64,
37}
38
39impl EvalSchedule {
40    pub fn from_config(
41        eval_interval_secs: Option<i64>,
42        config: Option<&FlowScheduleConfig>,
43    ) -> Result<Option<Self>> {
44        let Some(interval_secs) = eval_interval_secs else {
45            return Ok(None);
46        };
47        ensure!(
48            interval_secs > 0,
49            InvalidQuerySnafu {
50                reason: format!(
51                    "Invalid eval_interval_secs: must be positive, got {interval_secs}"
52                )
53            }
54        );
55
56        Ok(Some(match config {
57            Some(c) => {
58                ensure!(
59                    c.catchup_max_runs > 0,
60                    InvalidQuerySnafu {
61                        reason:
62                            "Invalid FlowScheduleConfig.catchup_max_runs: must be positive, got 0"
63                                .to_string()
64                    }
65                );
66                ensure!(
67                    c.catchup_max_lag_secs > 0,
68                    InvalidQuerySnafu {
69                        reason: format!(
70                            "Invalid FlowScheduleConfig.catchup_max_lag_secs: must be positive, got {}",
71                            c.catchup_max_lag_secs
72                        )
73                    }
74                );
75
76                Self {
77                    interval_secs,
78                    anchor_secs: c.anchor_secs,
79                    start_secs: c.start_secs,
80                    missed_tick_policy: c.missed_tick_policy,
81                    max_runs: c.catchup_max_runs,
82                    max_lag_secs: c.catchup_max_lag_secs,
83                }
84            }
85            None => {
86                let c = FlowScheduleConfig::default_with_start(0, interval_secs);
87                Self {
88                    interval_secs,
89                    anchor_secs: c.anchor_secs,
90                    start_secs: c.start_secs,
91                    missed_tick_policy: c.missed_tick_policy,
92                    max_runs: c.catchup_max_runs,
93                    max_lag_secs: c.catchup_max_lag_secs,
94                }
95            }
96        }))
97    }
98
99    /// Returns the next scheduled time strictly after `cursor_secs`.
100    pub fn next_scheduled_time_after(&self, cursor_secs: i64) -> i64 {
101        next_in_sequence(cursor_secs, self.start_secs, self.interval_secs)
102    }
103}
104
105fn next_in_sequence(cursor: i64, start: i64, interval: i64) -> i64 {
106    if interval <= 0 {
107        return cursor.saturating_add(1).max(start);
108    }
109    if cursor < start {
110        return start;
111    }
112
113    let k = (cursor - start) / interval;
114    start.saturating_add((k + 1).saturating_mul(interval))
115}
116
117fn first_due_in_sequence(cursor: i64, start: i64, interval: i64) -> i64 {
118    if interval <= 0 {
119        return cursor.saturating_add(1).max(start);
120    }
121    if cursor < start {
122        start
123    } else {
124        next_in_sequence(cursor, start, interval)
125    }
126}
127
128/// Scheduled times selected for execution in one scheduler pass.
129///
130/// A scheduled time is the logical evaluation timestamp for one flow run. When
131/// executing a timestamp from `scheduled_times_secs`, SQL/TQL `now()` is bound
132/// to that timestamp instead of the wall-clock execution time.
133#[derive(Debug, Clone, PartialEq)]
134pub struct DueScheduledTimes {
135    /// Scheduled times to execute, ordered oldest to newest.
136    pub scheduled_times_secs: Vec<i64>,
137    /// Number of due scheduled times skipped by lag or max-runs limits.
138    pub skipped: u64,
139}
140
141/// Select due scheduled times `<= wall_now_secs` without materializing all missed ticks.
142pub fn select_due_scheduled_times(
143    schedule: &EvalSchedule,
144    cursor_secs: i64,
145    wall_now_secs: i64,
146) -> Option<DueScheduledTimes> {
147    if schedule.interval_secs <= 0 {
148        return None;
149    }
150
151    let first_due = first_due_in_sequence(cursor_secs, schedule.start_secs, schedule.interval_secs);
152    if first_due > wall_now_secs {
153        return Some(DueScheduledTimes {
154            scheduled_times_secs: vec![],
155            skipped: 0,
156        });
157    }
158
159    let total_count = ((wall_now_secs - first_due) / schedule.interval_secs) as u64 + 1;
160    match schedule.missed_tick_policy {
161        FlowMissedTickPolicy::Skip => {
162            let last = first_due + (total_count as i64 - 1) * schedule.interval_secs;
163            Some(DueScheduledTimes {
164                scheduled_times_secs: vec![last],
165                skipped: total_count.saturating_sub(1),
166            })
167        }
168        FlowMissedTickPolicy::BoundedCatchUp => {
169            let cutoff = wall_now_secs.saturating_sub(schedule.max_lag_secs);
170            let skipped_by_cutoff = if first_due >= cutoff {
171                0
172            } else {
173                ((cutoff - first_due + schedule.interval_secs - 1) / schedule.interval_secs) as u64
174            }
175            .min(total_count);
176
177            let remaining = total_count.saturating_sub(skipped_by_cutoff);
178            if remaining == 0 {
179                return Some(DueScheduledTimes {
180                    scheduled_times_secs: vec![],
181                    skipped: total_count,
182                });
183            }
184
185            // max_lag_secs decides which missed scheduled times are recent enough to
186            // run; max_runs caps how many of those times we execute
187            // back-to-back in one scheduler pass.
188            let keep_count = remaining.min(schedule.max_runs as u64);
189            let keep_start = skipped_by_cutoff + remaining.saturating_sub(keep_count);
190            let scheduled_times_secs = (0..keep_count)
191                .map(|i| first_due + (keep_start as i64 + i as i64) * schedule.interval_secs)
192                .collect::<Vec<_>>();
193
194            Some(DueScheduledTimes {
195                scheduled_times_secs,
196                skipped: total_count - keep_count,
197            })
198        }
199    }
200}
201
202/// Ceils `time` to the next `anchor + k * interval` boundary.
203pub fn ceil_to_boundary(time: i64, anchor: i64, interval: i64) -> i64 {
204    if interval <= 0 {
205        return time;
206    }
207    if time <= anchor {
208        return anchor;
209    }
210
211    let diff = i128::from(time) - i128::from(anchor);
212    let interval = i128::from(interval);
213    let k = (diff + interval - 1) / interval;
214    let boundary = i128::from(anchor) + k * interval;
215
216    boundary.clamp(i128::from(i64::MIN), i128::from(i64::MAX)) as i64
217}
218
219#[cfg(test)]
220mod test {
221    use super::*;
222
223    fn schedule(
224        start: i64,
225        policy: FlowMissedTickPolicy,
226        max_runs: u32,
227        max_lag_secs: i64,
228    ) -> EvalSchedule {
229        EvalSchedule {
230            interval_secs: 60,
231            anchor_secs: 0,
232            start_secs: start,
233            missed_tick_policy: policy,
234            max_runs,
235            max_lag_secs,
236        }
237    }
238
239    fn config(policy: FlowMissedTickPolicy) -> FlowScheduleConfig {
240        FlowScheduleConfig {
241            anchor_secs: 10,
242            start_secs: 70,
243            missed_tick_policy: policy,
244            catchup_max_runs: 4,
245            catchup_max_lag_secs: 600,
246        }
247    }
248
249    #[test]
250    fn ceil_to_boundary_handles_anchor_and_interval_edges() {
251        assert_eq!(ceil_to_boundary(-10, 0, 60), 0);
252        assert_eq!(ceil_to_boundary(0, 0, 60), 0);
253        assert_eq!(ceil_to_boundary(1, 0, 60), 60);
254        assert_eq!(ceil_to_boundary(60, 0, 60), 60);
255        assert_eq!(ceil_to_boundary(101, 100, 60), 160);
256        assert_eq!(ceil_to_boundary(50, 0, 0), 50);
257        assert_eq!(ceil_to_boundary(i64::MAX, 0, 60), i64::MAX);
258        assert_eq!(ceil_to_boundary(i64::MAX - 1, i64::MIN, 60), i64::MAX);
259    }
260
261    #[test]
262    fn from_config_maps_typed_config_and_defaults() {
263        assert!(EvalSchedule::from_config(None, None).unwrap().is_none());
264        assert!(EvalSchedule::from_config(Some(0), None).is_err());
265
266        let from_typed =
267            EvalSchedule::from_config(Some(300), Some(&config(FlowMissedTickPolicy::Skip)))
268                .unwrap()
269                .unwrap();
270        assert_eq!(from_typed.interval_secs, 300);
271        assert_eq!(from_typed.anchor_secs, 10);
272        assert_eq!(from_typed.start_secs, 70);
273        assert_eq!(from_typed.missed_tick_policy, FlowMissedTickPolicy::Skip);
274        assert_eq!(from_typed.max_runs, 4);
275        assert_eq!(from_typed.max_lag_secs, 600);
276
277        let defaulted = EvalSchedule::from_config(Some(300), None).unwrap().unwrap();
278        assert_eq!(defaulted.start_secs, 0);
279        assert_eq!(defaulted.max_runs, 3);
280        assert_eq!(defaulted.max_lag_secs, 900);
281    }
282
283    #[test]
284    fn from_config_rejects_invalid_catchup_limits() {
285        let mut c = config(FlowMissedTickPolicy::BoundedCatchUp);
286        c.catchup_max_runs = 0;
287        assert!(EvalSchedule::from_config(Some(300), Some(&c)).is_err());
288
289        let mut c = config(FlowMissedTickPolicy::BoundedCatchUp);
290        c.catchup_max_lag_secs = 0;
291        assert!(EvalSchedule::from_config(Some(300), Some(&c)).is_err());
292    }
293
294    #[test]
295    fn next_scheduled_time_after_respects_start_sequence() {
296        let s = schedule(50, FlowMissedTickPolicy::BoundedCatchUp, 3, 300);
297        assert_eq!(s.next_scheduled_time_after(0), 50);
298        assert_eq!(s.next_scheduled_time_after(50), 110);
299        assert_eq!(s.next_scheduled_time_after(100), 110);
300    }
301
302    #[test]
303    fn due_scheduled_time_selection_handles_empty_and_start_boundary() {
304        let s = schedule(120, FlowMissedTickPolicy::BoundedCatchUp, 10, 3600);
305        assert_eq!(
306            select_due_scheduled_times(&s, 0, 100)
307                .unwrap()
308                .scheduled_times_secs,
309            Vec::<i64>::new()
310        );
311        assert_eq!(
312            select_due_scheduled_times(&s, 0, 300)
313                .unwrap()
314                .scheduled_times_secs,
315            vec![120, 180, 240, 300]
316        );
317    }
318
319    #[test]
320    fn bounded_catch_up_applies_lag_and_max_runs() {
321        let s = schedule(0, FlowMissedTickPolicy::BoundedCatchUp, 2, 180);
322        let due = select_due_scheduled_times(&s, 0, 600).unwrap();
323        assert_eq!(due.scheduled_times_secs, vec![540, 600]);
324        assert_eq!(due.skipped, 8);
325    }
326
327    #[test]
328    fn bounded_catch_up_can_skip_all_due_scheduled_times() {
329        let s = schedule(0, FlowMissedTickPolicy::BoundedCatchUp, 3, 30);
330        let due = select_due_scheduled_times(&s, 0, 100).unwrap();
331        assert!(due.scheduled_times_secs.is_empty());
332        assert_eq!(due.skipped, 1);
333    }
334
335    #[test]
336    fn skip_policy_keeps_only_latest_due_scheduled_time() {
337        let s = schedule(0, FlowMissedTickPolicy::Skip, 5, 3600);
338        let due = select_due_scheduled_times(&s, 0, 300).unwrap();
339        assert_eq!(due.scheduled_times_secs, vec![300]);
340        assert_eq!(due.skipped, 4);
341    }
342
343    #[test]
344    fn huge_missed_gap_allocates_only_kept_scheduled_times() {
345        let s = schedule(0, FlowMissedTickPolicy::BoundedCatchUp, 5, 3600);
346        let due = select_due_scheduled_times(&s, 0, 86400).unwrap();
347        assert_eq!(
348            due.scheduled_times_secs,
349            vec![86160, 86220, 86280, 86340, 86400]
350        );
351        assert_eq!(due.skipped, 1435);
352    }
353}