Skip to main content

flow/batching_mode/task/
inc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_error::ext::BoxedError;
18use common_telemetry::debug;
19use common_telemetry::tracing::warn;
20use datafusion_expr::{DmlStatement, LogicalPlan};
21use query::options::{
22    FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE, FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY,
23    FLOW_SINK_TABLE_ID,
24};
25use snafu::ResultExt;
26use table::metadata::TableId;
27
28use crate::Error;
29use crate::batching_mode::state::CheckpointMode;
30use crate::batching_mode::table_creator::QueryType;
31use crate::batching_mode::task::BatchingTask;
32use crate::batching_mode::utils::{
33    analyze_incremental_aggregate_plan, get_table_info_df_schema,
34    rewrite_incremental_aggregate_with_sink_merge,
35};
36use crate::error::{ExternalSnafu, UnexpectedSnafu};
37
38impl BatchingTask {
39    async fn sink_table_id(&self) -> Result<TableId, Error> {
40        let table = self
41            .config
42            .catalog_manager
43            .table(
44                &self.config.sink_table_name[0],
45                &self.config.sink_table_name[1],
46                &self.config.sink_table_name[2],
47                None,
48            )
49            .await
50            .map_err(BoxedError::new)
51            .context(ExternalSnafu)?
52            .ok_or_else(|| {
53                UnexpectedSnafu {
54                    reason: format!(
55                        "Flow {} cannot build incremental extensions because sink table {:?} was not found",
56                        self.config.flow_id, self.config.sink_table_name
57                    ),
58                }
59                .build()
60            })?;
61        Ok(table.table_info().table_id())
62    }
63
64    /// For incremental-mode SQL queries, attempt to prepare an executable plan
65    /// that is safe for incremental scan extensions.
66    ///
67    /// Returns `Some(plan)` when incremental extensions are safe. For an
68    /// incremental-delta query, `None` means the caller must not execute the
69    /// original plan without incremental extensions, because that would become
70    /// an unfiltered full snapshot; the caller should restore the dirty signal
71    /// and skip the current round instead. The returned plan may be either a
72    /// rewritten delta-LEFT-JOIN-sink merge plan or the original plan. In
73    /// particular, plain GROUP BY queries with no aggregate merge columns are
74    /// incremental safe without a rewrite, so they return `Some(original_plan)`.
75    pub(super) async fn prepare_plan_for_incremental(
76        &self,
77        plan: &LogicalPlan,
78    ) -> Result<Option<LogicalPlan>, Error> {
79        let is_incremental_sql = {
80            let state = self.state.read().unwrap();
81            if state.is_incremental_disabled() {
82                return Ok(None);
83            }
84            state.checkpoint_mode() == CheckpointMode::Incremental
85                && matches!(self.config.query_type, QueryType::Sql)
86        };
87
88        if !is_incremental_sql {
89            return Ok(None);
90        }
91
92        // Extract inner query plan from the DML wrapper.
93        // Non-DML or non-SQL plans bypass the rewrite and keep checkpoint mode;
94        // non-aggregate TQL or non-INSERT plans do not need incremental scan extensions.
95        let inner_plan = match plan {
96            LogicalPlan::Dml(dml) => dml.input.as_ref().clone(),
97            _ => return Ok(None),
98        };
99
100        // Analyze the plan for incremental rewritability.
101        // Incremental reads currently require aggregate / group-by plans that
102        // can be rewritten into a delta-left-join-sink merge. Non-aggregate SQL
103        // (projection, filter, or other non-aggregate shapes) stays full-snapshot
104        // until separately supported, and incremental mode is permanently
105        // disabled for this flow.
106        let Some(analysis) = analyze_incremental_aggregate_plan(&inner_plan)? else {
107            warn!(
108                "Flow {} incremental mode but plan is not an aggregate query; \
109                 permanently disabling incremental for this flow",
110                self.config.flow_id
111            );
112            self.state.write().unwrap().disable_incremental();
113            return Ok(None);
114        };
115
116        if !analysis.unsupported_exprs.is_empty() {
117            warn!(
118                "Flow {} incremental aggregate contains unsupported expressions {:?}; \
119                 permanently disabling incremental for this flow",
120                self.config.flow_id, analysis.unsupported_exprs
121            );
122            self.state.write().unwrap().disable_incremental();
123            return Ok(None);
124        }
125
126        // Plain GROUP BY without aggregate expressions has no values to
127        // merge between delta and sink. The incremental delta scan emits
128        // changed groups, and sink primary-key write semantics make this
129        // idempotent; no explicit left-join rewrite is needed.
130        if analysis.merge_columns.is_empty() {
131            return Ok(Some(plan.clone()));
132        }
133
134        // Fetch sink table for the merge rewrite.
135        // Transient errors (catalog, schema, filter, or rewrite) should not
136        // permanently disable incremental mode. They also must not execute the
137        // original plan without incremental extensions, because that would be an
138        // unfiltered full snapshot. The caller will restore the dirty signal and
139        // skip this round while keeping incremental retryable.
140        let sink_table = match get_table_info_df_schema(
141            self.config.catalog_manager.clone(),
142            self.config.sink_table_name.clone(),
143        )
144        .await
145        {
146            Ok((table, _)) => table,
147            Err(err) => {
148                warn!(
149                    "Flow {} failed to fetch sink table for incremental rewrite; \
150                     skipping this round to avoid unfiltered full snapshot: {:?}",
151                    self.config.flow_id, err
152                );
153                return Ok(None);
154            }
155        };
156        let rewritten_inner = match rewrite_incremental_aggregate_with_sink_merge(
157            &inner_plan,
158            &analysis,
159            sink_table,
160            &self.config.sink_table_name,
161            None,
162        )
163        .await
164        {
165            Ok(plan) => plan,
166            Err(err) => {
167                warn!(
168                    "Flow {} failed to rewrite incremental aggregate with sink merge; \
169                     skipping this round to avoid unfiltered full snapshot: {:?}",
170                    self.config.flow_id, err
171                );
172                return Ok(None);
173            }
174        };
175
176        // Reconstruct DML plan with the rewritten inner plan
177        let rewritten = match plan {
178            LogicalPlan::Dml(dml) => LogicalPlan::Dml(DmlStatement::new(
179                dml.table_name.clone(),
180                dml.target.clone(),
181                dml.op.clone(),
182                Arc::new(rewritten_inner),
183            )),
184            _ => unreachable!("already matched Dml above"),
185        };
186
187        debug!(
188            "Flow {} rewrote incremental SQL aggregate query with sink merge",
189            self.config.flow_id
190        );
191
192        Ok(Some(rewritten))
193    }
194
195    pub(super) async fn build_flow_query_extensions(
196        &self,
197        incremental_safe: bool,
198        can_advance_checkpoints: bool,
199    ) -> Result<Vec<(&'static str, String)>, Error> {
200        let mut extensions = vec![("flow.return_region_seq", "true".to_string())];
201
202        let incremental_checkpoints_json = {
203            let state = self.state.read().unwrap();
204            if incremental_safe
205                && can_advance_checkpoints
206                && !state.is_incremental_disabled()
207                && state.checkpoint_mode() == CheckpointMode::Incremental
208                && !state.checkpoints().is_empty()
209            {
210                Some(serde_json::to_string(state.checkpoints()).map_err(|err| {
211                    UnexpectedSnafu {
212                        reason: format!("Failed to serialize checkpoint map: {err}"),
213                    }
214                    .build()
215                })?)
216            } else {
217                None
218            }
219        };
220
221        if let Some(checkpoints_json) = incremental_checkpoints_json {
222            let sink_table_id = self.sink_table_id().await?;
223            extensions.push((FLOW_SINK_TABLE_ID, sink_table_id.to_string()));
224            extensions.push((
225                FLOW_INCREMENTAL_MODE,
226                FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
227            ));
228            extensions.push((FLOW_INCREMENTAL_AFTER_SEQS, checkpoints_json));
229        }
230
231        Ok(extensions)
232    }
233}