Skip to main content

flow/batching_mode/task/
ckpt.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use client::OutputWithMetrics;
18use common_error::ext::ErrorExt;
19use common_error::status_code::StatusCode;
20use common_telemetry::tracing::warn;
21use common_telemetry::{debug, info};
22
23use crate::batching_mode::checkpoint::{
24    FlowCheckpointDecision, FlowQueryFallbackReason, checkpoint_mode_label,
25};
26use crate::batching_mode::state::{CheckpointMode, TaskState};
27use crate::batching_mode::task::{BatchingTask, QueryCoverage};
28use crate::metrics::{
29    METRIC_FLOW_BATCHING_ENGINE_CHECKPOINT_DECISION_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_MODE_CNT,
30};
31use crate::{Error, FlowId};
32
33impl BatchingTask {
34    /// Classify execution errors into checkpoint fallback reasons. A stale
35    /// snapshot fence is special only for fenced repair chunks.
36    pub(super) fn query_failure_reason(
37        err: &Error,
38        coverage: &QueryCoverage,
39    ) -> FlowQueryFallbackReason {
40        if err.status_code() == StatusCode::RequestOutdated {
41            if matches!(coverage, QueryCoverage::FencedRepairChunk { .. }) {
42                FlowQueryFallbackReason::SnapshotFenceExpired
43            } else {
44                FlowQueryFallbackReason::StaleCursor
45            }
46        } else if matches!(coverage, QueryCoverage::IncrementalDelta) {
47            FlowQueryFallbackReason::IncrementalQueryFailure
48        } else {
49            FlowQueryFallbackReason::QueryFailure
50        }
51    }
52
53    /// Apply the conservative state transition for a failed query. This never
54    /// advances checkpoints; it only decides whether to abandon repair state or
55    /// fall back from incremental mode.
56    pub(super) fn apply_query_failure_to_state(
57        state: &mut TaskState,
58        elapsed: Duration,
59        coverage: &QueryCoverage,
60        reason: FlowQueryFallbackReason,
61    ) -> Option<FlowCheckpointDecision> {
62        state.after_query_exec(elapsed, false);
63        let checkpoint_mode = state.checkpoint_mode();
64        if matches!(coverage, QueryCoverage::FencedRepairChunk { .. })
65            && matches!(reason, FlowQueryFallbackReason::SnapshotFenceExpired)
66        {
67            // `abandon_fenced_repair()` restores the not-yet-in-flight pending
68            // windows from the stale repair back to live dirty windows. The
69            // currently executing chunk is restored separately by the outer
70            // failure path (`handle_executed_query_failure`), after this state
71            // transition has removed `pending_fenced_repair`, so it also lands
72            // in live dirty windows for the next scoped base repair.
73            state.abandon_fenced_repair();
74            return Some(FlowCheckpointDecision::FallbackToFullSnapshot {
75                previous_mode: checkpoint_mode,
76                reason,
77            });
78        }
79
80        if checkpoint_mode == CheckpointMode::Incremental {
81            state.mark_full_snapshot();
82        }
83        Some(FlowCheckpointDecision::FallbackToFullSnapshot {
84            previous_mode: checkpoint_mode,
85            reason,
86        })
87    }
88
89    /// Apply checkpoint transitions for a successfully executed query using its
90    /// terminal watermark proof and declared coverage.
91    pub(super) fn apply_query_result_to_state(
92        state: &mut TaskState,
93        res: &OutputWithMetrics,
94        elapsed: Duration,
95        coverage: &QueryCoverage,
96    ) -> FlowCheckpointDecision {
97        state.after_query_exec(elapsed, true);
98        let checkpoint_mode = state.checkpoint_mode();
99        if let (Some(participating_regions), Some(watermark_map)) =
100            (res.participating_regions(), res.region_watermark_map())
101        {
102            let participating_region_count = participating_regions.len();
103            let watermark_count = watermark_map.len();
104            match coverage {
105                QueryCoverage::ScopedBaseRepair => {
106                    if !state.can_advance_full_snapshot_checkpoints(
107                        &participating_regions,
108                        &watermark_map,
109                    ) {
110                        return FlowCheckpointDecision::FallbackToFullSnapshot {
111                            previous_mode: checkpoint_mode,
112                            reason: FlowQueryFallbackReason::IncompleteRegionWatermark,
113                        };
114                    }
115
116                    if state.is_incremental_disabled() {
117                        return FlowCheckpointDecision::FallbackToFullSnapshot {
118                            previous_mode: CheckpointMode::FullSnapshot,
119                            reason: FlowQueryFallbackReason::IncrementalDisabled,
120                        };
121                    }
122
123                    if state.dirty_time_windows.is_empty() {
124                        state.advance_checkpoints(watermark_map);
125                        FlowCheckpointDecision::AdvancedFromFullSnapshot {
126                            participating_regions: participating_region_count,
127                            watermarks: watermark_count,
128                        }
129                    } else if let Some(repair) =
130                        state.start_fenced_repair(watermark_map.into_iter().collect())
131                    {
132                        FlowCheckpointDecision::ContinuedFencedRepair {
133                            pending_windows: repair.pending_windows().len(),
134                            watermarks: repair.high().len(),
135                        }
136                    } else {
137                        FlowCheckpointDecision::FallbackToFullSnapshot {
138                            previous_mode: checkpoint_mode,
139                            reason: FlowQueryFallbackReason::DirtyBacklogPending,
140                        }
141                    }
142                }
143                QueryCoverage::FencedRepairChunk { .. } => {
144                    if !state
145                        .fenced_repair_watermarks_match_high(&participating_regions, &watermark_map)
146                    {
147                        // A successful repair chunk whose terminal watermark
148                        // differs from the frozen fence `H` cannot prove that
149                        // continuing the same repair is safe. The pre-`H`
150                        // repair work item already executed under the snapshot
151                        // fence, so do not requeue it; restore only
152                        // not-yet-in-flight pending windows, then start over
153                        // with a fresh H. Later/post-`H` writes still rely on
154                        // live dirty notifications.
155                        state.abandon_fenced_repair();
156                        return FlowCheckpointDecision::FallbackToFullSnapshot {
157                            previous_mode: checkpoint_mode,
158                            reason: FlowQueryFallbackReason::IncompleteRegionWatermark,
159                        };
160                    }
161
162                    if state.fenced_repair_pending_is_empty() {
163                        state.finish_fenced_repair();
164                        if state.is_incremental_disabled() {
165                            FlowCheckpointDecision::FallbackToFullSnapshot {
166                                previous_mode: CheckpointMode::FullSnapshot,
167                                reason: FlowQueryFallbackReason::IncrementalDisabled,
168                            }
169                        } else {
170                            FlowCheckpointDecision::AdvancedFromFullSnapshot {
171                                participating_regions: participating_region_count,
172                                watermarks: watermark_count,
173                            }
174                        }
175                    } else {
176                        let repair = state
177                            .pending_fenced_repair()
178                            .expect("fenced repair exists after matching repair chunk watermark");
179                        FlowCheckpointDecision::ContinuedFencedRepair {
180                            pending_windows: repair.pending_windows().len(),
181                            watermarks: repair.high().len(),
182                        }
183                    }
184                }
185                QueryCoverage::UnfilteredFull => {
186                    if state.can_advance_full_snapshot_checkpoints(
187                        &participating_regions,
188                        &watermark_map,
189                    ) {
190                        state.advance_checkpoints(watermark_map);
191                        if state.is_incremental_disabled() {
192                            FlowCheckpointDecision::FallbackToFullSnapshot {
193                                previous_mode: CheckpointMode::FullSnapshot,
194                                reason: FlowQueryFallbackReason::IncrementalDisabled,
195                            }
196                        } else {
197                            FlowCheckpointDecision::AdvancedFromFullSnapshot {
198                                participating_regions: participating_region_count,
199                                watermarks: watermark_count,
200                            }
201                        }
202                    } else {
203                        debug_assert_ne!(checkpoint_mode, CheckpointMode::Incremental);
204                        FlowCheckpointDecision::FallbackToFullSnapshot {
205                            previous_mode: checkpoint_mode,
206                            reason: FlowQueryFallbackReason::IncompleteRegionWatermark,
207                        }
208                    }
209                }
210                QueryCoverage::IncrementalDelta => {
211                    if state.can_advance_incremental_checkpoints_with_participation(
212                        &participating_regions,
213                        &watermark_map,
214                    ) {
215                        state.advance_incremental_checkpoints_with_participation(
216                            &participating_regions,
217                            watermark_map,
218                        );
219                        FlowCheckpointDecision::AdvancedIncremental {
220                            participating_regions: participating_region_count,
221                            watermarks: watermark_count,
222                        }
223                    } else {
224                        state.mark_full_snapshot();
225                        FlowCheckpointDecision::FallbackToFullSnapshot {
226                            previous_mode: checkpoint_mode,
227                            reason: FlowQueryFallbackReason::IncompleteRegionWatermark,
228                        }
229                    }
230                }
231            }
232        } else {
233            if matches!(coverage, QueryCoverage::FencedRepairChunk { .. }) {
234                state.abandon_fenced_repair();
235            }
236            if matches!(checkpoint_mode, CheckpointMode::Incremental) {
237                state.mark_full_snapshot();
238            }
239            FlowCheckpointDecision::FallbackToFullSnapshot {
240                previous_mode: checkpoint_mode,
241                reason: FlowQueryFallbackReason::MissingRegionWatermark,
242            }
243        }
244    }
245
246    pub(super) fn record_checkpoint_decision(flow_id: FlowId, decision: FlowCheckpointDecision) {
247        let flow_id = flow_id.to_string();
248        METRIC_FLOW_BATCHING_ENGINE_CHECKPOINT_DECISION_CNT
249            .with_label_values(&[
250                flow_id.as_str(),
251                decision.mode_label(),
252                decision.decision_label(),
253                decision.reason_label(),
254            ])
255            .inc();
256
257        match decision {
258            FlowCheckpointDecision::AdvancedFromFullSnapshot {
259                participating_regions,
260                watermarks,
261            } => {
262                info!(
263                    "Flow {flow_id} switched to incremental mode after full snapshot, participating_regions={participating_regions}, watermarks={watermarks}"
264                );
265            }
266            FlowCheckpointDecision::AdvancedIncremental {
267                participating_regions,
268                watermarks,
269            } => {
270                debug!(
271                    "Flow {flow_id} advanced incremental checkpoints, participating_regions={participating_regions}, watermarks={watermarks}"
272                );
273            }
274            FlowCheckpointDecision::ContinuedFencedRepair {
275                pending_windows,
276                watermarks,
277            } => {
278                debug!(
279                    "Flow {flow_id} continued fenced repair, pending_windows={pending_windows}, watermarks={watermarks}"
280                );
281            }
282            FlowCheckpointDecision::FallbackToFullSnapshot {
283                previous_mode,
284                reason,
285            } => {
286                warn!(
287                    "Flow {flow_id} switched to full snapshot mode, previous_mode={}, reason={}",
288                    checkpoint_mode_label(previous_mode),
289                    reason.as_label()
290                );
291            }
292        }
293    }
294
295    pub(super) fn record_query_mode(flow_id: FlowId, mode: CheckpointMode) {
296        let flow_id = flow_id.to_string();
297        METRIC_FLOW_BATCHING_ENGINE_QUERY_MODE_CNT
298            .with_label_values(&[flow_id.as_str(), checkpoint_mode_label(mode)])
299            .inc();
300    }
301}