1pub use common_meta::key::flow::flow_info::{FlowMissedTickPolicy, FlowScheduleConfig};
18use snafu::ensure;
19
20use crate::error::{InvalidQuerySnafu, Result};
21
22#[derive(Debug, Clone, PartialEq)]
24pub struct EvalSchedule {
25 pub interval_secs: i64,
27 pub anchor_secs: i64,
29 pub start_secs: i64,
31 pub missed_tick_policy: FlowMissedTickPolicy,
33 pub max_runs: u32,
35 pub max_lag_secs: i64,
37}
38
39impl EvalSchedule {
40 pub fn from_config(
41 eval_interval_secs: Option<i64>,
42 config: Option<&FlowScheduleConfig>,
43 ) -> Result<Option<Self>> {
44 let Some(interval_secs) = eval_interval_secs else {
45 return Ok(None);
46 };
47 ensure!(
48 interval_secs > 0,
49 InvalidQuerySnafu {
50 reason: format!(
51 "Invalid eval_interval_secs: must be positive, got {interval_secs}"
52 )
53 }
54 );
55
56 Ok(Some(match config {
57 Some(c) => {
58 ensure!(
59 c.catchup_max_runs > 0,
60 InvalidQuerySnafu {
61 reason:
62 "Invalid FlowScheduleConfig.catchup_max_runs: must be positive, got 0"
63 .to_string()
64 }
65 );
66 ensure!(
67 c.catchup_max_lag_secs > 0,
68 InvalidQuerySnafu {
69 reason: format!(
70 "Invalid FlowScheduleConfig.catchup_max_lag_secs: must be positive, got {}",
71 c.catchup_max_lag_secs
72 )
73 }
74 );
75
76 Self {
77 interval_secs,
78 anchor_secs: c.anchor_secs,
79 start_secs: c.start_secs,
80 missed_tick_policy: c.missed_tick_policy,
81 max_runs: c.catchup_max_runs,
82 max_lag_secs: c.catchup_max_lag_secs,
83 }
84 }
85 None => {
86 let c = FlowScheduleConfig::default_with_start(0, interval_secs);
87 Self {
88 interval_secs,
89 anchor_secs: c.anchor_secs,
90 start_secs: c.start_secs,
91 missed_tick_policy: c.missed_tick_policy,
92 max_runs: c.catchup_max_runs,
93 max_lag_secs: c.catchup_max_lag_secs,
94 }
95 }
96 }))
97 }
98
99 pub fn next_scheduled_time_after(&self, cursor_secs: i64) -> i64 {
101 next_in_sequence(cursor_secs, self.start_secs, self.interval_secs)
102 }
103}
104
105fn next_in_sequence(cursor: i64, start: i64, interval: i64) -> i64 {
106 if interval <= 0 {
107 return cursor.saturating_add(1).max(start);
108 }
109 if cursor < start {
110 return start;
111 }
112
113 let k = (cursor - start) / interval;
114 start.saturating_add((k + 1).saturating_mul(interval))
115}
116
117fn first_due_in_sequence(cursor: i64, start: i64, interval: i64) -> i64 {
118 if interval <= 0 {
119 return cursor.saturating_add(1).max(start);
120 }
121 if cursor < start {
122 start
123 } else {
124 next_in_sequence(cursor, start, interval)
125 }
126}
127
128#[derive(Debug, Clone, PartialEq)]
134pub struct DueScheduledTimes {
135 pub scheduled_times_secs: Vec<i64>,
137 pub skipped: u64,
139}
140
141pub fn select_due_scheduled_times(
143 schedule: &EvalSchedule,
144 cursor_secs: i64,
145 wall_now_secs: i64,
146) -> Option<DueScheduledTimes> {
147 if schedule.interval_secs <= 0 {
148 return None;
149 }
150
151 let first_due = first_due_in_sequence(cursor_secs, schedule.start_secs, schedule.interval_secs);
152 if first_due > wall_now_secs {
153 return Some(DueScheduledTimes {
154 scheduled_times_secs: vec![],
155 skipped: 0,
156 });
157 }
158
159 let total_count = ((wall_now_secs - first_due) / schedule.interval_secs) as u64 + 1;
160 match schedule.missed_tick_policy {
161 FlowMissedTickPolicy::Skip => {
162 let last = first_due + (total_count as i64 - 1) * schedule.interval_secs;
163 Some(DueScheduledTimes {
164 scheduled_times_secs: vec![last],
165 skipped: total_count.saturating_sub(1),
166 })
167 }
168 FlowMissedTickPolicy::BoundedCatchUp => {
169 let cutoff = wall_now_secs.saturating_sub(schedule.max_lag_secs);
170 let skipped_by_cutoff = if first_due >= cutoff {
171 0
172 } else {
173 ((cutoff - first_due + schedule.interval_secs - 1) / schedule.interval_secs) as u64
174 }
175 .min(total_count);
176
177 let remaining = total_count.saturating_sub(skipped_by_cutoff);
178 if remaining == 0 {
179 return Some(DueScheduledTimes {
180 scheduled_times_secs: vec![],
181 skipped: total_count,
182 });
183 }
184
185 let keep_count = remaining.min(schedule.max_runs as u64);
189 let keep_start = skipped_by_cutoff + remaining.saturating_sub(keep_count);
190 let scheduled_times_secs = (0..keep_count)
191 .map(|i| first_due + (keep_start as i64 + i as i64) * schedule.interval_secs)
192 .collect::<Vec<_>>();
193
194 Some(DueScheduledTimes {
195 scheduled_times_secs,
196 skipped: total_count - keep_count,
197 })
198 }
199 }
200}
201
202pub fn ceil_to_boundary(time: i64, anchor: i64, interval: i64) -> i64 {
204 if interval <= 0 {
205 return time;
206 }
207 if time <= anchor {
208 return anchor;
209 }
210
211 let diff = i128::from(time) - i128::from(anchor);
212 let interval = i128::from(interval);
213 let k = (diff + interval - 1) / interval;
214 let boundary = i128::from(anchor) + k * interval;
215
216 boundary.clamp(i128::from(i64::MIN), i128::from(i64::MAX)) as i64
217}
218
219#[cfg(test)]
220mod test {
221 use super::*;
222
223 fn schedule(
224 start: i64,
225 policy: FlowMissedTickPolicy,
226 max_runs: u32,
227 max_lag_secs: i64,
228 ) -> EvalSchedule {
229 EvalSchedule {
230 interval_secs: 60,
231 anchor_secs: 0,
232 start_secs: start,
233 missed_tick_policy: policy,
234 max_runs,
235 max_lag_secs,
236 }
237 }
238
239 fn config(policy: FlowMissedTickPolicy) -> FlowScheduleConfig {
240 FlowScheduleConfig {
241 anchor_secs: 10,
242 start_secs: 70,
243 missed_tick_policy: policy,
244 catchup_max_runs: 4,
245 catchup_max_lag_secs: 600,
246 }
247 }
248
249 #[test]
250 fn ceil_to_boundary_handles_anchor_and_interval_edges() {
251 assert_eq!(ceil_to_boundary(-10, 0, 60), 0);
252 assert_eq!(ceil_to_boundary(0, 0, 60), 0);
253 assert_eq!(ceil_to_boundary(1, 0, 60), 60);
254 assert_eq!(ceil_to_boundary(60, 0, 60), 60);
255 assert_eq!(ceil_to_boundary(101, 100, 60), 160);
256 assert_eq!(ceil_to_boundary(50, 0, 0), 50);
257 assert_eq!(ceil_to_boundary(i64::MAX, 0, 60), i64::MAX);
258 assert_eq!(ceil_to_boundary(i64::MAX - 1, i64::MIN, 60), i64::MAX);
259 }
260
261 #[test]
262 fn from_config_maps_typed_config_and_defaults() {
263 assert!(EvalSchedule::from_config(None, None).unwrap().is_none());
264 assert!(EvalSchedule::from_config(Some(0), None).is_err());
265
266 let from_typed =
267 EvalSchedule::from_config(Some(300), Some(&config(FlowMissedTickPolicy::Skip)))
268 .unwrap()
269 .unwrap();
270 assert_eq!(from_typed.interval_secs, 300);
271 assert_eq!(from_typed.anchor_secs, 10);
272 assert_eq!(from_typed.start_secs, 70);
273 assert_eq!(from_typed.missed_tick_policy, FlowMissedTickPolicy::Skip);
274 assert_eq!(from_typed.max_runs, 4);
275 assert_eq!(from_typed.max_lag_secs, 600);
276
277 let defaulted = EvalSchedule::from_config(Some(300), None).unwrap().unwrap();
278 assert_eq!(defaulted.start_secs, 0);
279 assert_eq!(defaulted.max_runs, 3);
280 assert_eq!(defaulted.max_lag_secs, 900);
281 }
282
283 #[test]
284 fn from_config_rejects_invalid_catchup_limits() {
285 let mut c = config(FlowMissedTickPolicy::BoundedCatchUp);
286 c.catchup_max_runs = 0;
287 assert!(EvalSchedule::from_config(Some(300), Some(&c)).is_err());
288
289 let mut c = config(FlowMissedTickPolicy::BoundedCatchUp);
290 c.catchup_max_lag_secs = 0;
291 assert!(EvalSchedule::from_config(Some(300), Some(&c)).is_err());
292 }
293
294 #[test]
295 fn next_scheduled_time_after_respects_start_sequence() {
296 let s = schedule(50, FlowMissedTickPolicy::BoundedCatchUp, 3, 300);
297 assert_eq!(s.next_scheduled_time_after(0), 50);
298 assert_eq!(s.next_scheduled_time_after(50), 110);
299 assert_eq!(s.next_scheduled_time_after(100), 110);
300 }
301
302 #[test]
303 fn due_scheduled_time_selection_handles_empty_and_start_boundary() {
304 let s = schedule(120, FlowMissedTickPolicy::BoundedCatchUp, 10, 3600);
305 assert_eq!(
306 select_due_scheduled_times(&s, 0, 100)
307 .unwrap()
308 .scheduled_times_secs,
309 Vec::<i64>::new()
310 );
311 assert_eq!(
312 select_due_scheduled_times(&s, 0, 300)
313 .unwrap()
314 .scheduled_times_secs,
315 vec![120, 180, 240, 300]
316 );
317 }
318
319 #[test]
320 fn bounded_catch_up_applies_lag_and_max_runs() {
321 let s = schedule(0, FlowMissedTickPolicy::BoundedCatchUp, 2, 180);
322 let due = select_due_scheduled_times(&s, 0, 600).unwrap();
323 assert_eq!(due.scheduled_times_secs, vec![540, 600]);
324 assert_eq!(due.skipped, 8);
325 }
326
327 #[test]
328 fn bounded_catch_up_can_skip_all_due_scheduled_times() {
329 let s = schedule(0, FlowMissedTickPolicy::BoundedCatchUp, 3, 30);
330 let due = select_due_scheduled_times(&s, 0, 100).unwrap();
331 assert!(due.scheduled_times_secs.is_empty());
332 assert_eq!(due.skipped, 1);
333 }
334
335 #[test]
336 fn skip_policy_keeps_only_latest_due_scheduled_time() {
337 let s = schedule(0, FlowMissedTickPolicy::Skip, 5, 3600);
338 let due = select_due_scheduled_times(&s, 0, 300).unwrap();
339 assert_eq!(due.scheduled_times_secs, vec![300]);
340 assert_eq!(due.skipped, 4);
341 }
342
343 #[test]
344 fn huge_missed_gap_allocates_only_kept_scheduled_times() {
345 let s = schedule(0, FlowMissedTickPolicy::BoundedCatchUp, 5, 3600);
346 let due = select_due_scheduled_times(&s, 0, 86400).unwrap();
347 assert_eq!(
348 due.scheduled_times_secs,
349 vec![86160, 86220, 86280, 86340, 86400]
350 );
351 assert_eq!(due.skipped, 1435);
352 }
353}