1use std::time::Duration;
16
17use client::OutputWithMetrics;
18use common_error::ext::ErrorExt;
19use common_error::status_code::StatusCode;
20use common_telemetry::tracing::warn;
21use common_telemetry::{debug, info};
22
23use crate::batching_mode::checkpoint::{
24 FlowCheckpointDecision, FlowQueryFallbackReason, checkpoint_mode_label,
25};
26use crate::batching_mode::state::{CheckpointMode, TaskState};
27use crate::batching_mode::task::{BatchingTask, QueryCoverage};
28use crate::metrics::{
29 METRIC_FLOW_BATCHING_ENGINE_CHECKPOINT_DECISION_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_MODE_CNT,
30};
31use crate::{Error, FlowId};
32
33impl BatchingTask {
34 pub(super) fn query_failure_reason(
37 err: &Error,
38 coverage: &QueryCoverage,
39 ) -> FlowQueryFallbackReason {
40 if err.status_code() == StatusCode::RequestOutdated {
41 if matches!(coverage, QueryCoverage::FencedRepairChunk { .. }) {
42 FlowQueryFallbackReason::SnapshotFenceExpired
43 } else {
44 FlowQueryFallbackReason::StaleCursor
45 }
46 } else if matches!(coverage, QueryCoverage::IncrementalDelta) {
47 FlowQueryFallbackReason::IncrementalQueryFailure
48 } else {
49 FlowQueryFallbackReason::QueryFailure
50 }
51 }
52
53 pub(super) fn apply_query_failure_to_state(
57 state: &mut TaskState,
58 elapsed: Duration,
59 coverage: &QueryCoverage,
60 reason: FlowQueryFallbackReason,
61 ) -> Option<FlowCheckpointDecision> {
62 state.after_query_exec(elapsed, false);
63 let checkpoint_mode = state.checkpoint_mode();
64 if matches!(coverage, QueryCoverage::FencedRepairChunk { .. })
65 && matches!(reason, FlowQueryFallbackReason::SnapshotFenceExpired)
66 {
67 state.abandon_fenced_repair();
74 return Some(FlowCheckpointDecision::FallbackToFullSnapshot {
75 previous_mode: checkpoint_mode,
76 reason,
77 });
78 }
79
80 if checkpoint_mode == CheckpointMode::Incremental {
81 state.mark_full_snapshot();
82 }
83 Some(FlowCheckpointDecision::FallbackToFullSnapshot {
84 previous_mode: checkpoint_mode,
85 reason,
86 })
87 }
88
89 pub(super) fn apply_query_result_to_state(
92 state: &mut TaskState,
93 res: &OutputWithMetrics,
94 elapsed: Duration,
95 coverage: &QueryCoverage,
96 ) -> FlowCheckpointDecision {
97 state.after_query_exec(elapsed, true);
98 let checkpoint_mode = state.checkpoint_mode();
99 if let (Some(participating_regions), Some(watermark_map)) =
100 (res.participating_regions(), res.region_watermark_map())
101 {
102 let participating_region_count = participating_regions.len();
103 let watermark_count = watermark_map.len();
104 match coverage {
105 QueryCoverage::ScopedBaseRepair => {
106 if !state.can_advance_full_snapshot_checkpoints(
107 &participating_regions,
108 &watermark_map,
109 ) {
110 return FlowCheckpointDecision::FallbackToFullSnapshot {
111 previous_mode: checkpoint_mode,
112 reason: FlowQueryFallbackReason::IncompleteRegionWatermark,
113 };
114 }
115
116 if state.is_incremental_disabled() {
117 return FlowCheckpointDecision::FallbackToFullSnapshot {
118 previous_mode: CheckpointMode::FullSnapshot,
119 reason: FlowQueryFallbackReason::IncrementalDisabled,
120 };
121 }
122
123 if state.dirty_time_windows.is_empty() {
124 state.advance_checkpoints(watermark_map);
125 FlowCheckpointDecision::AdvancedFromFullSnapshot {
126 participating_regions: participating_region_count,
127 watermarks: watermark_count,
128 }
129 } else if let Some(repair) =
130 state.start_fenced_repair(watermark_map.into_iter().collect())
131 {
132 FlowCheckpointDecision::ContinuedFencedRepair {
133 pending_windows: repair.pending_windows().len(),
134 watermarks: repair.high().len(),
135 }
136 } else {
137 FlowCheckpointDecision::FallbackToFullSnapshot {
138 previous_mode: checkpoint_mode,
139 reason: FlowQueryFallbackReason::DirtyBacklogPending,
140 }
141 }
142 }
143 QueryCoverage::FencedRepairChunk { .. } => {
144 if !state
145 .fenced_repair_watermarks_match_high(&participating_regions, &watermark_map)
146 {
147 state.abandon_fenced_repair();
156 return FlowCheckpointDecision::FallbackToFullSnapshot {
157 previous_mode: checkpoint_mode,
158 reason: FlowQueryFallbackReason::IncompleteRegionWatermark,
159 };
160 }
161
162 if state.fenced_repair_pending_is_empty() {
163 state.finish_fenced_repair();
164 if state.is_incremental_disabled() {
165 FlowCheckpointDecision::FallbackToFullSnapshot {
166 previous_mode: CheckpointMode::FullSnapshot,
167 reason: FlowQueryFallbackReason::IncrementalDisabled,
168 }
169 } else {
170 FlowCheckpointDecision::AdvancedFromFullSnapshot {
171 participating_regions: participating_region_count,
172 watermarks: watermark_count,
173 }
174 }
175 } else {
176 let repair = state
177 .pending_fenced_repair()
178 .expect("fenced repair exists after matching repair chunk watermark");
179 FlowCheckpointDecision::ContinuedFencedRepair {
180 pending_windows: repair.pending_windows().len(),
181 watermarks: repair.high().len(),
182 }
183 }
184 }
185 QueryCoverage::UnfilteredFull => {
186 if state.can_advance_full_snapshot_checkpoints(
187 &participating_regions,
188 &watermark_map,
189 ) {
190 state.advance_checkpoints(watermark_map);
191 if state.is_incremental_disabled() {
192 FlowCheckpointDecision::FallbackToFullSnapshot {
193 previous_mode: CheckpointMode::FullSnapshot,
194 reason: FlowQueryFallbackReason::IncrementalDisabled,
195 }
196 } else {
197 FlowCheckpointDecision::AdvancedFromFullSnapshot {
198 participating_regions: participating_region_count,
199 watermarks: watermark_count,
200 }
201 }
202 } else {
203 debug_assert_ne!(checkpoint_mode, CheckpointMode::Incremental);
204 FlowCheckpointDecision::FallbackToFullSnapshot {
205 previous_mode: checkpoint_mode,
206 reason: FlowQueryFallbackReason::IncompleteRegionWatermark,
207 }
208 }
209 }
210 QueryCoverage::IncrementalDelta => {
211 if state.can_advance_incremental_checkpoints_with_participation(
212 &participating_regions,
213 &watermark_map,
214 ) {
215 state.advance_incremental_checkpoints_with_participation(
216 &participating_regions,
217 watermark_map,
218 );
219 FlowCheckpointDecision::AdvancedIncremental {
220 participating_regions: participating_region_count,
221 watermarks: watermark_count,
222 }
223 } else {
224 state.mark_full_snapshot();
225 FlowCheckpointDecision::FallbackToFullSnapshot {
226 previous_mode: checkpoint_mode,
227 reason: FlowQueryFallbackReason::IncompleteRegionWatermark,
228 }
229 }
230 }
231 }
232 } else {
233 if matches!(coverage, QueryCoverage::FencedRepairChunk { .. }) {
234 state.abandon_fenced_repair();
235 }
236 if matches!(checkpoint_mode, CheckpointMode::Incremental) {
237 state.mark_full_snapshot();
238 }
239 FlowCheckpointDecision::FallbackToFullSnapshot {
240 previous_mode: checkpoint_mode,
241 reason: FlowQueryFallbackReason::MissingRegionWatermark,
242 }
243 }
244 }
245
246 pub(super) fn record_checkpoint_decision(flow_id: FlowId, decision: FlowCheckpointDecision) {
247 let flow_id = flow_id.to_string();
248 METRIC_FLOW_BATCHING_ENGINE_CHECKPOINT_DECISION_CNT
249 .with_label_values(&[
250 flow_id.as_str(),
251 decision.mode_label(),
252 decision.decision_label(),
253 decision.reason_label(),
254 ])
255 .inc();
256
257 match decision {
258 FlowCheckpointDecision::AdvancedFromFullSnapshot {
259 participating_regions,
260 watermarks,
261 } => {
262 info!(
263 "Flow {flow_id} switched to incremental mode after full snapshot, participating_regions={participating_regions}, watermarks={watermarks}"
264 );
265 }
266 FlowCheckpointDecision::AdvancedIncremental {
267 participating_regions,
268 watermarks,
269 } => {
270 debug!(
271 "Flow {flow_id} advanced incremental checkpoints, participating_regions={participating_regions}, watermarks={watermarks}"
272 );
273 }
274 FlowCheckpointDecision::ContinuedFencedRepair {
275 pending_windows,
276 watermarks,
277 } => {
278 debug!(
279 "Flow {flow_id} continued fenced repair, pending_windows={pending_windows}, watermarks={watermarks}"
280 );
281 }
282 FlowCheckpointDecision::FallbackToFullSnapshot {
283 previous_mode,
284 reason,
285 } => {
286 warn!(
287 "Flow {flow_id} switched to full snapshot mode, previous_mode={}, reason={}",
288 checkpoint_mode_label(previous_mode),
289 reason.as_label()
290 );
291 }
292 }
293 }
294
295 pub(super) fn record_query_mode(flow_id: FlowId, mode: CheckpointMode) {
296 let flow_id = flow_id.to_string();
297 METRIC_FLOW_BATCHING_ENGINE_QUERY_MODE_CNT
298 .with_label_values(&[flow_id.as_str(), checkpoint_mode_label(mode)])
299 .inc();
300 }
301}