flow/batching_mode/checkpoint.rs
1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::batching_mode::state::CheckpointMode;
16
17pub(super) const CHECKPOINT_DECISION_ADVANCE: &str = "advance";
18pub(super) const CHECKPOINT_DECISION_FALLBACK: &str = "fallback";
19pub(super) const CHECKPOINT_DECISION_CONTINUE_REPAIR: &str = "continue_repair";
20pub(super) const CHECKPOINT_REASON_NONE: &str = "none";
21
22/// Why the task fell back to full snapshot mode.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub(super) enum FlowQueryFallbackReason {
25 /// The query result did not include a region-watermark map at all.
26 MissingRegionWatermark,
27 /// Some participating regions could not prove safe advancement against
28 /// both the returned watermarks and the checkpoint map.
29 IncompleteRegionWatermark,
30 /// The query only covered part of the dirty backlog, so global checkpoints
31 /// cannot advance yet. Incremental SQL drains all dirty windows before
32 /// checkpoint advancement; this primarily protects scoped full-snapshot
33 /// runs capped by the per-query dirty-window limit.
34 DirtyBacklogPending,
35 /// The datanode detected a stale incremental cursor and the Flow
36 /// must recompute from scratch.
37 StaleCursor,
38 /// A fenced repair chunk tried to use a snapshot upper bound that the
39 /// storage engine can no longer enforce, so the current repair high must
40 /// be abandoned and rebound by a fresh scoped full snapshot repair.
41 SnapshotFenceExpired,
42 /// A non-stale-cursor query failure; the Flow resets to full snapshot
43 /// to avoid cascading errors.
44 IncrementalQueryFailure,
45 /// A non-incremental query failed while the task was already in full
46 /// snapshot or scoped repair mode.
47 QueryFailure,
48 /// Incremental mode has been permanently disabled for this Flow
49 /// (e.g. because the query shape is not incrementally safe).
50 IncrementalDisabled,
51}
52
53impl FlowQueryFallbackReason {
54 pub(super) fn as_label(self) -> &'static str {
55 match self {
56 Self::MissingRegionWatermark => "missing_region_watermark",
57 Self::IncompleteRegionWatermark => "incomplete_region_watermark",
58 Self::DirtyBacklogPending => "dirty_backlog_pending",
59 Self::StaleCursor => "stale_cursor",
60 Self::SnapshotFenceExpired => "snapshot_fence_expired",
61 Self::IncrementalQueryFailure => "incremental_query_failure",
62 Self::QueryFailure => "query_failure",
63 Self::IncrementalDisabled => "incremental_disabled",
64 }
65 }
66}
67
68/// Decision produced by `BatchingTask::apply_query_result_to_state` after
69/// each Flow query execution. Describes whether the task advanced its
70/// checkpoint state or fell back to full snapshot, and why.
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub(super) enum FlowCheckpointDecision {
73 /// FullSnapshot → Incremental transition.
74 ///
75 /// The query exercised every participating region, all returned valid
76 /// watermarks, and the checkpoint map was populated from scratch.
77 /// Subsequent executions will use incremental after-seqs.
78 AdvancedFromFullSnapshot {
79 participating_regions: usize,
80 watermarks: usize,
81 },
82 /// Existing Incremental → Incremental (in-place advancement).
83 ///
84 /// A subset of participating regions advanced their watermarks. The
85 /// task stays in incremental mode with an updated checkpoint map.
86 AdvancedIncremental {
87 participating_regions: usize,
88 watermarks: usize,
89 },
90 /// FullSnapshot stayed in full snapshot mode because a scoped base repair
91 /// found additional dirty windows that may be concurrent with the returned
92 /// high watermark. These windows must be repaired under the fixed high
93 /// before checkpoints can advance.
94 ContinuedFencedRepair {
95 pending_windows: usize,
96 watermarks: usize,
97 },
98 /// Any mode → FullSnapshot.
99 ///
100 /// Watermark information was incomplete, a participating region was
101 /// absent from the existing checkpoint map, the task has permanently
102 /// disabled incremental mode, or the query itself failed. The task
103 /// resets to full snapshot semantics for the next execution.
104 FallbackToFullSnapshot {
105 previous_mode: CheckpointMode,
106 reason: FlowQueryFallbackReason,
107 },
108}
109
110impl FlowCheckpointDecision {
111 pub(super) fn mode_label(self) -> &'static str {
112 match self {
113 Self::AdvancedFromFullSnapshot { .. } => {
114 checkpoint_mode_label(CheckpointMode::FullSnapshot)
115 }
116 Self::AdvancedIncremental { .. } => checkpoint_mode_label(CheckpointMode::Incremental),
117 // Fenced repair is intentionally a FullSnapshot sub-state, not a
118 // third top-level checkpoint mode, so metrics keep the
119 // `full_snapshot` mode label while the decision label carries
120 // `continue_repair`.
121 Self::ContinuedFencedRepair { .. } => {
122 checkpoint_mode_label(CheckpointMode::FullSnapshot)
123 }
124 Self::FallbackToFullSnapshot { previous_mode, .. } => {
125 checkpoint_mode_label(previous_mode)
126 }
127 }
128 }
129
130 pub(super) fn decision_label(self) -> &'static str {
131 match self {
132 Self::AdvancedFromFullSnapshot { .. } | Self::AdvancedIncremental { .. } => {
133 CHECKPOINT_DECISION_ADVANCE
134 }
135 Self::ContinuedFencedRepair { .. } => CHECKPOINT_DECISION_CONTINUE_REPAIR,
136 Self::FallbackToFullSnapshot { .. } => CHECKPOINT_DECISION_FALLBACK,
137 }
138 }
139
140 pub(super) fn reason_label(self) -> &'static str {
141 match self {
142 Self::FallbackToFullSnapshot { reason, .. } => reason.as_label(),
143 _ => CHECKPOINT_REASON_NONE,
144 }
145 }
146}
147
148pub(super) fn checkpoint_mode_label(mode: CheckpointMode) -> &'static str {
149 match mode {
150 CheckpointMode::FullSnapshot => "full_snapshot",
151 CheckpointMode::Incremental => "incremental",
152 }
153}