Skip to main content

flow/batching_mode/
checkpoint.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::batching_mode::state::CheckpointMode;
16
17pub(super) const CHECKPOINT_DECISION_ADVANCE: &str = "advance";
18pub(super) const CHECKPOINT_DECISION_FALLBACK: &str = "fallback";
19pub(super) const CHECKPOINT_DECISION_CONTINUE_REPAIR: &str = "continue_repair";
20pub(super) const CHECKPOINT_REASON_NONE: &str = "none";
21
22/// Why the task fell back to full snapshot mode.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub(super) enum FlowQueryFallbackReason {
25    /// The query result did not include a region-watermark map at all.
26    MissingRegionWatermark,
27    /// Some participating regions could not prove safe advancement against
28    /// both the returned watermarks and the checkpoint map.
29    IncompleteRegionWatermark,
30    /// The query only covered part of the dirty backlog, so global checkpoints
31    /// cannot advance yet. Incremental SQL drains all dirty windows before
32    /// checkpoint advancement; this primarily protects scoped full-snapshot
33    /// runs capped by the per-query dirty-window limit.
34    DirtyBacklogPending,
35    /// The datanode detected a stale incremental cursor and the Flow
36    /// must recompute from scratch.
37    StaleCursor,
38    /// A fenced repair chunk tried to use a snapshot upper bound that the
39    /// storage engine can no longer enforce, so the current repair high must
40    /// be abandoned and rebound by a fresh scoped full snapshot repair.
41    SnapshotFenceExpired,
42    /// A non-stale-cursor query failure; the Flow resets to full snapshot
43    /// to avoid cascading errors.
44    IncrementalQueryFailure,
45    /// A non-incremental query failed while the task was already in full
46    /// snapshot or scoped repair mode.
47    QueryFailure,
48    /// Incremental mode has been permanently disabled for this Flow
49    /// (e.g. because the query shape is not incrementally safe).
50    IncrementalDisabled,
51}
52
53impl FlowQueryFallbackReason {
54    pub(super) fn as_label(self) -> &'static str {
55        match self {
56            Self::MissingRegionWatermark => "missing_region_watermark",
57            Self::IncompleteRegionWatermark => "incomplete_region_watermark",
58            Self::DirtyBacklogPending => "dirty_backlog_pending",
59            Self::StaleCursor => "stale_cursor",
60            Self::SnapshotFenceExpired => "snapshot_fence_expired",
61            Self::IncrementalQueryFailure => "incremental_query_failure",
62            Self::QueryFailure => "query_failure",
63            Self::IncrementalDisabled => "incremental_disabled",
64        }
65    }
66}
67
68/// Decision produced by `BatchingTask::apply_query_result_to_state` after
69/// each Flow query execution. Describes whether the task advanced its
70/// checkpoint state or fell back to full snapshot, and why.
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub(super) enum FlowCheckpointDecision {
73    /// FullSnapshot → Incremental transition.
74    ///
75    /// The query exercised every participating region, all returned valid
76    /// watermarks, and the checkpoint map was populated from scratch.
77    /// Subsequent executions will use incremental after-seqs.
78    AdvancedFromFullSnapshot {
79        participating_regions: usize,
80        watermarks: usize,
81    },
82    /// Existing Incremental → Incremental (in-place advancement).
83    ///
84    /// A subset of participating regions advanced their watermarks. The
85    /// task stays in incremental mode with an updated checkpoint map.
86    AdvancedIncremental {
87        participating_regions: usize,
88        watermarks: usize,
89    },
90    /// FullSnapshot stayed in full snapshot mode because a scoped base repair
91    /// found additional dirty windows that may be concurrent with the returned
92    /// high watermark. These windows must be repaired under the fixed high
93    /// before checkpoints can advance.
94    ContinuedFencedRepair {
95        pending_windows: usize,
96        watermarks: usize,
97    },
98    /// Any mode → FullSnapshot.
99    ///
100    /// Watermark information was incomplete, a participating region was
101    /// absent from the existing checkpoint map, the task has permanently
102    /// disabled incremental mode, or the query itself failed. The task
103    /// resets to full snapshot semantics for the next execution.
104    FallbackToFullSnapshot {
105        previous_mode: CheckpointMode,
106        reason: FlowQueryFallbackReason,
107    },
108}
109
110impl FlowCheckpointDecision {
111    pub(super) fn mode_label(self) -> &'static str {
112        match self {
113            Self::AdvancedFromFullSnapshot { .. } => {
114                checkpoint_mode_label(CheckpointMode::FullSnapshot)
115            }
116            Self::AdvancedIncremental { .. } => checkpoint_mode_label(CheckpointMode::Incremental),
117            // Fenced repair is intentionally a FullSnapshot sub-state, not a
118            // third top-level checkpoint mode, so metrics keep the
119            // `full_snapshot` mode label while the decision label carries
120            // `continue_repair`.
121            Self::ContinuedFencedRepair { .. } => {
122                checkpoint_mode_label(CheckpointMode::FullSnapshot)
123            }
124            Self::FallbackToFullSnapshot { previous_mode, .. } => {
125                checkpoint_mode_label(previous_mode)
126            }
127        }
128    }
129
130    pub(super) fn decision_label(self) -> &'static str {
131        match self {
132            Self::AdvancedFromFullSnapshot { .. } | Self::AdvancedIncremental { .. } => {
133                CHECKPOINT_DECISION_ADVANCE
134            }
135            Self::ContinuedFencedRepair { .. } => CHECKPOINT_DECISION_CONTINUE_REPAIR,
136            Self::FallbackToFullSnapshot { .. } => CHECKPOINT_DECISION_FALLBACK,
137        }
138    }
139
140    pub(super) fn reason_label(self) -> &'static str {
141        match self {
142            Self::FallbackToFullSnapshot { reason, .. } => reason.as_label(),
143            _ => CHECKPOINT_REASON_NONE,
144        }
145    }
146}
147
148pub(super) fn checkpoint_mode_label(mode: CheckpointMode) -> &'static str {
149    match mode {
150        CheckpointMode::FullSnapshot => "full_snapshot",
151        CheckpointMode::Incremental => "incremental",
152    }
153}