flow/batching_mode/task/
inc.rs1use std::sync::Arc;
16
17use common_error::ext::BoxedError;
18use common_telemetry::debug;
19use common_telemetry::tracing::warn;
20use datafusion_expr::{DmlStatement, LogicalPlan};
21use query::options::{
22 FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE, FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY,
23 FLOW_SINK_TABLE_ID,
24};
25use snafu::ResultExt;
26use table::metadata::TableId;
27
28use crate::Error;
29use crate::batching_mode::state::CheckpointMode;
30use crate::batching_mode::table_creator::QueryType;
31use crate::batching_mode::task::BatchingTask;
32use crate::batching_mode::utils::{
33 analyze_incremental_aggregate_plan, get_table_info_df_schema,
34 rewrite_incremental_aggregate_with_sink_merge,
35};
36use crate::error::{ExternalSnafu, UnexpectedSnafu};
37
38impl BatchingTask {
39 async fn sink_table_id(&self) -> Result<TableId, Error> {
40 let table = self
41 .config
42 .catalog_manager
43 .table(
44 &self.config.sink_table_name[0],
45 &self.config.sink_table_name[1],
46 &self.config.sink_table_name[2],
47 None,
48 )
49 .await
50 .map_err(BoxedError::new)
51 .context(ExternalSnafu)?
52 .ok_or_else(|| {
53 UnexpectedSnafu {
54 reason: format!(
55 "Flow {} cannot build incremental extensions because sink table {:?} was not found",
56 self.config.flow_id, self.config.sink_table_name
57 ),
58 }
59 .build()
60 })?;
61 Ok(table.table_info().table_id())
62 }
63
64 pub(super) async fn prepare_plan_for_incremental(
76 &self,
77 plan: &LogicalPlan,
78 ) -> Result<Option<LogicalPlan>, Error> {
79 let is_incremental_sql = {
80 let state = self.state.read().unwrap();
81 if state.is_incremental_disabled() {
82 return Ok(None);
83 }
84 state.checkpoint_mode() == CheckpointMode::Incremental
85 && matches!(self.config.query_type, QueryType::Sql)
86 };
87
88 if !is_incremental_sql {
89 return Ok(None);
90 }
91
92 let inner_plan = match plan {
96 LogicalPlan::Dml(dml) => dml.input.as_ref().clone(),
97 _ => return Ok(None),
98 };
99
100 let Some(analysis) = analyze_incremental_aggregate_plan(&inner_plan)? else {
107 warn!(
108 "Flow {} incremental mode but plan is not an aggregate query; \
109 permanently disabling incremental for this flow",
110 self.config.flow_id
111 );
112 self.state.write().unwrap().disable_incremental();
113 return Ok(None);
114 };
115
116 if !analysis.unsupported_exprs.is_empty() {
117 warn!(
118 "Flow {} incremental aggregate contains unsupported expressions {:?}; \
119 permanently disabling incremental for this flow",
120 self.config.flow_id, analysis.unsupported_exprs
121 );
122 self.state.write().unwrap().disable_incremental();
123 return Ok(None);
124 }
125
126 if analysis.merge_columns.is_empty() {
131 return Ok(Some(plan.clone()));
132 }
133
134 let sink_table = match get_table_info_df_schema(
141 self.config.catalog_manager.clone(),
142 self.config.sink_table_name.clone(),
143 )
144 .await
145 {
146 Ok((table, _)) => table,
147 Err(err) => {
148 warn!(
149 "Flow {} failed to fetch sink table for incremental rewrite; \
150 skipping this round to avoid unfiltered full snapshot: {:?}",
151 self.config.flow_id, err
152 );
153 return Ok(None);
154 }
155 };
156 let rewritten_inner = match rewrite_incremental_aggregate_with_sink_merge(
157 &inner_plan,
158 &analysis,
159 sink_table,
160 &self.config.sink_table_name,
161 None,
162 )
163 .await
164 {
165 Ok(plan) => plan,
166 Err(err) => {
167 warn!(
168 "Flow {} failed to rewrite incremental aggregate with sink merge; \
169 skipping this round to avoid unfiltered full snapshot: {:?}",
170 self.config.flow_id, err
171 );
172 return Ok(None);
173 }
174 };
175
176 let rewritten = match plan {
178 LogicalPlan::Dml(dml) => LogicalPlan::Dml(DmlStatement::new(
179 dml.table_name.clone(),
180 dml.target.clone(),
181 dml.op.clone(),
182 Arc::new(rewritten_inner),
183 )),
184 _ => unreachable!("already matched Dml above"),
185 };
186
187 debug!(
188 "Flow {} rewrote incremental SQL aggregate query with sink merge",
189 self.config.flow_id
190 );
191
192 Ok(Some(rewritten))
193 }
194
195 pub(super) async fn build_flow_query_extensions(
196 &self,
197 incremental_safe: bool,
198 can_advance_checkpoints: bool,
199 ) -> Result<Vec<(&'static str, String)>, Error> {
200 let mut extensions = vec![("flow.return_region_seq", "true".to_string())];
201
202 let incremental_checkpoints_json = {
203 let state = self.state.read().unwrap();
204 if incremental_safe
205 && can_advance_checkpoints
206 && !state.is_incremental_disabled()
207 && state.checkpoint_mode() == CheckpointMode::Incremental
208 && !state.checkpoints().is_empty()
209 {
210 Some(serde_json::to_string(state.checkpoints()).map_err(|err| {
211 UnexpectedSnafu {
212 reason: format!("Failed to serialize checkpoint map: {err}"),
213 }
214 .build()
215 })?)
216 } else {
217 None
218 }
219 };
220
221 if let Some(checkpoints_json) = incremental_checkpoints_json {
222 let sink_table_id = self.sink_table_id().await?;
223 extensions.push((FLOW_SINK_TABLE_ID, sink_table_id.to_string()));
224 extensions.push((
225 FLOW_INCREMENTAL_MODE,
226 FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
227 ));
228 extensions.push((FLOW_INCREMENTAL_AFTER_SEQS, checkpoints_json));
229 }
230
231 Ok(extensions)
232 }
233}