Skip to main content

flow/batching_mode/
task.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18
19use api::v1::{CreateTableExpr, TableName};
20use catalog::CatalogManagerRef;
21use common_error::ext::BoxedError;
22use common_query::logical_plan::breakup_insert_plan;
23use common_telemetry::tracing::warn;
24use common_telemetry::{debug, info};
25use common_time::Timestamp;
26use datafusion::datasource::DefaultTableSource;
27use datafusion::sql::unparser::expr_to_sql;
28use datafusion_common::tree_node::{Transformed, TreeNode};
29use datafusion_common::utils::quote_identifier;
30use datafusion_common::{DFSchemaRef, TableReference};
31use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp, col, lit};
32use datatypes::schema::Schema;
33use query::QueryEngineRef;
34use query::options::FLOW_INCREMENTAL_MODE;
35use query::query_engine::DefaultSerializer;
36use session::context::QueryContextRef;
37use snafu::{OptionExt, ResultExt};
38use sql::parsers::utils::is_tql;
39use store_api::mito_engine_options::MERGE_MODE_KEY;
40use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
41use table::table::adapter::DfTableProviderAdapter;
42use tokio::sync::oneshot::error::TryRecvError;
43use tokio::sync::{Mutex, oneshot};
44use tokio::time::Instant;
45
46use crate::batching_mode::BatchingModeOptions;
47use crate::batching_mode::checkpoint::checkpoint_mode_label;
48use crate::batching_mode::eval_schedule::{EvalSchedule, select_due_scheduled_times};
49use crate::batching_mode::frontend_client::{FrontendClient, PeerDesc};
50use crate::batching_mode::state::{
51    CheckpointMode, DirtyTimeWindows, FilterExprInfo, TaskState, to_df_literal,
52};
53use crate::batching_mode::table_creator::{QueryType, create_table_with_expr};
54use crate::batching_mode::time_window::TimeWindowExpr;
55use crate::batching_mode::utils::{
56    AddFilterRewriter, ColumnMatcherRewriter, df_plan_to_sql, gen_plan_with_matching_schema,
57    get_table_info_df_schema, sql_to_df_plan,
58};
59use crate::df_optimizer::apply_df_optimizer;
60use crate::error::{
61    DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu, SubstraitEncodeLogicalPlanSnafu,
62    UnexpectedSnafu,
63};
64use crate::metrics::{
65    METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
66    METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
67    METRIC_FLOW_ROWS,
68};
69use crate::{Error, FlowId};
70
71mod ckpt;
72mod inc;
73
74/// Returns the current wall-clock Unix timestamp in seconds.
75fn wall_clock_unix_secs() -> i64 {
76    SystemTime::now()
77        .duration_since(UNIX_EPOCH)
78        .unwrap_or_default()
79        .as_secs() as i64
80}
81
82/// The task's config, immutable once created
83#[derive(Clone)]
84pub struct TaskConfig {
85    pub flow_id: FlowId,
86    pub query: String,
87    /// output schema of the query
88    pub output_schema: DFSchemaRef,
89    pub time_window_expr: Option<TimeWindowExpr>,
90    /// in seconds
91    pub expire_after: Option<i64>,
92    pub sink_table_name: [String; 3],
93    pub source_table_names: HashSet<[String; 3]>,
94    pub catalog_manager: CatalogManagerRef,
95    pub query_type: QueryType,
96    pub batch_opts: Arc<BatchingModeOptions>,
97    pub flow_eval_interval: Option<Duration>,
98    /// Typed schedule configuration, pre-parsed at task creation time.
99    pub eval_schedule: Option<EvalSchedule>,
100}
101
102fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
103    let is_tql = is_tql(query_ctx.sql_dialect(), query)
104        .map_err(BoxedError::new)
105        .context(ExternalSnafu)?;
106    Ok(if is_tql {
107        QueryType::Tql
108    } else {
109        QueryType::Sql
110    })
111}
112
113fn is_merge_mode_last_non_null(options: &HashMap<String, String>) -> bool {
114    options
115        .get(MERGE_MODE_KEY)
116        .map(|mode| mode.eq_ignore_ascii_case("last_non_null"))
117        .unwrap_or(false)
118}
119
120fn encode_insert_plan_request(
121    insert_to: TableName,
122    insert_input_plan: &LogicalPlan,
123) -> Result<api::v1::QueryRequest, Error> {
124    let message = DFLogicalSubstraitConvertor {}
125        .encode(insert_input_plan, DefaultSerializer)
126        .context(SubstraitEncodeLogicalPlanSnafu)?;
127    Ok(api::v1::QueryRequest {
128        query: Some(api::v1::query_request::Query::InsertIntoPlan(
129            api::v1::InsertIntoPlan {
130                table_name: Some(insert_to),
131                logical_plan: message.to_vec(),
132            },
133        )),
134    })
135}
136
137fn format_insert_target_columns(plan: &LogicalPlan) -> String {
138    plan.schema()
139        .fields()
140        .iter()
141        .map(|field| quote_identifier(field.name()).to_string())
142        .collect::<Vec<_>>()
143        .join(", ")
144}
145
146#[derive(Clone)]
147pub struct BatchingTask {
148    pub config: Arc<TaskConfig>,
149    pub state: Arc<RwLock<TaskState>>,
150    /// Serializes plan generation, execution, checkpoint advancement, and dirty
151    /// window restoration for this flow. Without this, a manual flush and the
152    /// background loop can process the same checkpoint range concurrently.
153    execution_lock: Arc<Mutex<()>>,
154}
155
156/// Arguments for creating batching task
157pub struct TaskArgs<'a> {
158    pub flow_id: FlowId,
159    pub query: &'a str,
160    pub plan: LogicalPlan,
161    pub time_window_expr: Option<TimeWindowExpr>,
162    pub expire_after: Option<i64>,
163    pub sink_table_name: [String; 3],
164    pub source_table_names: Vec<[String; 3]>,
165    pub query_ctx: QueryContextRef,
166    pub catalog_manager: CatalogManagerRef,
167    pub shutdown_rx: oneshot::Receiver<()>,
168    pub batch_opts: Arc<BatchingModeOptions>,
169    pub flow_eval_interval: Option<Duration>,
170    /// Typed schedule configuration pre-parsed from `CreateFlowArgs`.
171    pub eval_schedule: Option<EvalSchedule>,
172}
173
174pub struct PlanInfo {
175    pub plan: LogicalPlan,
176    pub dirty_restore: DirtyRestore,
177    pub coverage: QueryCoverage,
178}
179
180#[derive(Clone)]
181pub enum QueryCoverage {
182    /// Explicit full-query snapshot coverage, e.g. TQL or evaluation-interval
183    /// SQL flows whose plan shape cannot be safely dirty-window pruned. This
184    /// must not be used as an implicit recovery path for scoped repair or an
185    /// unsafe incremental rewrite fallback.
186    UnfilteredFull,
187    /// Scoped full-snapshot repair over the current dirty windows. A successful
188    /// result may start a fenced repair if new dirty windows appeared meanwhile.
189    ScopedBaseRepair,
190    /// A chunk of windows being repaired under the frozen high-watermark `H`.
191    /// The `high` map is sent as snapshot read bounds and must be matched by
192    /// the returned terminal watermarks before checkpoints can advance.
193    FencedRepairChunk { high: BTreeMap<u64, u64> },
194    /// Incremental delta query over `(checkpoint, scan-open snapshot]`.
195    IncrementalDelta,
196}
197
198impl QueryCoverage {
199    /// Whether this query should use incremental scan extensions and
200    /// incremental checkpoint advancement rules.
201    fn is_incremental_delta(&self) -> bool {
202        matches!(self, Self::IncrementalDelta)
203    }
204
205    /// Snapshot upper bounds requested from the storage layer. Only fenced
206    /// repair chunks carry bounds; all other coverage relies on normal scans.
207    fn snapshot_seqs(&self) -> HashMap<u64, u64> {
208        match self {
209            Self::FencedRepairChunk { high } => high.iter().map(|(k, v)| (*k, *v)).collect(),
210            _ => HashMap::new(),
211        }
212    }
213}
214
215pub enum DirtyRestore {
216    /// The query was scoped to dirty time ranges; restore those ranges if the
217    /// run fails.
218    Scoped(FilterExprInfo),
219    /// The query could not be scoped to dirty time ranges, so the dirty-window
220    /// state is only a dirty signal. Restore the consumed signal if the full
221    /// run fails.
222    ///
223    /// TODO(discord9): Full-query runs only need a dirty bool flag. Refactor
224    /// the unscoped path to stop reusing `DirtyTimeWindows` for this signal.
225    Unscoped(DirtyTimeWindows),
226}
227
228struct ExecuteOnceOutcome {
229    new_query: Option<PlanInfo>,
230    /// Execution result of the generated insert plan.
231    ///
232    /// `Ok(Some((affected_rows, elapsed)))` means a query was executed.
233    /// `Ok(None)` means no query was generated because there was no dirty signal.
234    /// `Err(_)` means plan generation or execution failed.
235    result: Result<Option<(usize, Duration)>, Error>,
236}
237
238impl BatchingTask {
239    #[allow(clippy::too_many_arguments)]
240    pub fn try_new(
241        TaskArgs {
242            flow_id,
243            query,
244            plan,
245            time_window_expr,
246            expire_after,
247            sink_table_name,
248            source_table_names,
249            query_ctx,
250            catalog_manager,
251            shutdown_rx,
252            batch_opts,
253            flow_eval_interval,
254            eval_schedule,
255        }: TaskArgs<'_>,
256    ) -> Result<Self, Error> {
257        let mut state = TaskState::with_dirty_time_windows(
258            query_ctx.clone(),
259            shutdown_rx,
260            DirtyTimeWindows::new(
261                batch_opts.experimental_max_filter_num_per_query,
262                batch_opts.experimental_time_window_merge_threshold,
263            ),
264        );
265        if !batch_opts.experimental_enable_incremental_read {
266            state.disable_incremental();
267        }
268
269        Ok(Self {
270            config: Arc::new(TaskConfig {
271                flow_id,
272                query: query.to_string(),
273                time_window_expr,
274                expire_after,
275                sink_table_name,
276                source_table_names: source_table_names.into_iter().collect(),
277                catalog_manager,
278                output_schema: plan.schema().clone(),
279                query_type: determine_query_type(query, &query_ctx)?,
280                batch_opts,
281                flow_eval_interval,
282                eval_schedule,
283            }),
284            state: Arc::new(RwLock::new(state)),
285            execution_lock: Arc::new(Mutex::new(())),
286        })
287    }
288
289    pub fn last_execution_time_millis(&self) -> Option<i64> {
290        self.state.read().unwrap().last_execution_time_millis()
291    }
292
293    /// Collect flow-related extensions from the task's query context that should be
294    /// forwarded to the frontend (e.g. scheduled time).
295    fn frontend_extensions(&self) -> HashMap<String, String> {
296        let ctx = self.state.read().unwrap();
297        let all = ctx.query_ctx.extensions();
298        let mut flow_exts = HashMap::new();
299        // Propagate the scheduled time extension if present so that frontend
300        // execution can use the same logical time.
301        if let Some(v) = all.get(query::options::FLOW_SCHEDULED_TIME_MILLIS) {
302            flow_exts.insert(
303                query::options::FLOW_SCHEDULED_TIME_MILLIS.to_string(),
304                v.clone(),
305            );
306        }
307        flow_exts
308    }
309
310    /// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
311    ///
312    /// useful for flush_flow to flush dirty time windows range
313    pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
314        let now = SystemTime::now();
315        let now = Timestamp::new_second(
316            now.duration_since(UNIX_EPOCH)
317                .expect("Time went backwards")
318                .as_secs() as _,
319        );
320        let lower_bound = self
321            .config
322            .expire_after
323            .map(|e| now.sub_duration(Duration::from_secs(e as _)))
324            .transpose()
325            .map_err(BoxedError::new)
326            .context(ExternalSnafu)?
327            .unwrap_or(Timestamp::new_second(0));
328        debug!(
329            "Flow {} mark range ({:?}, {:?}) as dirty",
330            self.config.flow_id, lower_bound, now
331        );
332        self.state
333            .write()
334            .unwrap()
335            .dirty_time_windows
336            .add_window(lower_bound, Some(now));
337        Ok(())
338    }
339
340    /// Create sink table if not exists
341    pub async fn check_or_create_sink_table(
342        &self,
343        engine: &QueryEngineRef,
344        frontend_client: &Arc<FrontendClient>,
345    ) -> Result<Option<(usize, Duration)>, Error> {
346        if !self.is_table_exist(&self.config.sink_table_name).await? {
347            let create_table = self.gen_create_table_expr(engine.clone()).await?;
348            info!(
349                "Try creating sink table(if not exists) with expr: {:?}",
350                create_table
351            );
352            self.create_table(frontend_client, create_table).await?;
353            info!(
354                "Sink table {}(if not exists) created",
355                self.config.sink_table_name.join(".")
356            );
357        }
358
359        Ok(None)
360    }
361
362    /// Validates that the sink table schema can accept this flow's output.
363    ///
364    /// This is a dry-run of the same schema matching logic used by insert-plan
365    /// generation, but without adding dirty-window filters or executing the query. It is used
366    /// during CREATE FLOW to catch existing sink table mismatches early.
367    pub async fn validate_sink_table_schema(&self, engine: &QueryEngineRef) -> Result<(), Error> {
368        let (table, _) = get_table_info_df_schema(
369            self.config.catalog_manager.clone(),
370            self.config.sink_table_name.clone(),
371        )
372        .await?;
373
374        let table_meta = &table.table_info().meta;
375        let merge_mode_last_non_null =
376            is_merge_mode_last_non_null(&table_meta.options.extra_options);
377        let primary_key_indices = table_meta.primary_key_indices.clone();
378        let query_ctx = self.state.read().unwrap().query_ctx.clone();
379
380        gen_plan_with_matching_schema(
381            &self.config.query,
382            query_ctx,
383            engine.clone(),
384            table_meta.schema.clone(),
385            &primary_key_indices,
386            merge_mode_last_non_null,
387        )
388        .await
389        .map(|_| ())
390    }
391
392    async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
393        self.config
394            .catalog_manager
395            .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
396            .await
397            .map_err(BoxedError::new)
398            .context(ExternalSnafu)
399    }
400
401    pub(crate) async fn execute_once_serialized(
402        &self,
403        engine: &QueryEngineRef,
404        frontend_client: &Arc<FrontendClient>,
405        max_window_cnt: Option<usize>,
406    ) -> Result<Option<(usize, Duration)>, Error> {
407        let outcome = self
408            .execute_once_serialized_with_outcome(engine, frontend_client, max_window_cnt)
409            .await;
410        outcome.result
411    }
412
413    /// Executes one flow evaluation under `execution_lock` and keeps the
414    /// generated query context for the background loop's error logging/backoff.
415    async fn execute_once_serialized_with_outcome(
416        &self,
417        engine: &QueryEngineRef,
418        frontend_client: &Arc<FrontendClient>,
419        max_window_cnt: Option<usize>,
420    ) -> ExecuteOnceOutcome {
421        let _execution_guard = self.execution_lock.lock().await;
422        self.execute_once_unlocked(engine, frontend_client, max_window_cnt)
423            .await
424    }
425
426    /// Executes one flow evaluation. Caller must hold `execution_lock`.
427    async fn execute_once_unlocked(
428        &self,
429        engine: &QueryEngineRef,
430        frontend_client: &Arc<FrontendClient>,
431        max_window_cnt: Option<usize>,
432    ) -> ExecuteOnceOutcome {
433        let new_query = match self.gen_insert_plan_unlocked(engine, max_window_cnt).await {
434            Ok(new_query) => new_query,
435            Err(err) => {
436                return ExecuteOnceOutcome {
437                    new_query: None,
438                    result: Err(err),
439                };
440            }
441        };
442
443        if let Some(new_query) = new_query {
444            debug!("Generate new query: {}", new_query.plan);
445            let res = self
446                .execute_logical_plan_unlocked(
447                    frontend_client,
448                    &new_query.plan,
449                    &new_query.dirty_restore,
450                    &new_query.coverage,
451                )
452                .await;
453            if res.is_err() {
454                self.handle_executed_query_failure(Some(&new_query));
455            }
456            ExecuteOnceOutcome {
457                new_query: Some(new_query),
458                result: res,
459            }
460        } else {
461            debug!("Generate no query");
462            ExecuteOnceOutcome {
463                new_query: None,
464                result: Ok(None),
465            }
466        }
467    }
468
469    /// Generates the insert plan. Caller must reach this through the serialized path.
470    async fn gen_insert_plan_unlocked(
471        &self,
472        engine: &QueryEngineRef,
473        max_window_cnt: Option<usize>,
474    ) -> Result<Option<PlanInfo>, Error> {
475        let (table, df_schema) = get_table_info_df_schema(
476            self.config.catalog_manager.clone(),
477            self.config.sink_table_name.clone(),
478        )
479        .await?;
480
481        let table_meta = &table.table_info().meta;
482        let merge_mode_last_non_null =
483            is_merge_mode_last_non_null(&table_meta.options.extra_options);
484        let primary_key_indices = table_meta.primary_key_indices.clone();
485
486        let new_query = self
487            .gen_query_with_time_window(
488                engine.clone(),
489                &table.table_info().meta.schema,
490                &primary_key_indices,
491                merge_mode_last_non_null,
492                max_window_cnt,
493            )
494            .await?;
495
496        let Some(new_query) = new_query else {
497            return Ok(None);
498        };
499
500        // first check if all columns in input query exists in sink table
501        // since insert into ref to names in record batch generate by given query
502        let table_columns = df_schema
503            .columns()
504            .into_iter()
505            .map(|c| c.name)
506            .collect::<BTreeSet<_>>();
507        for column in new_query.plan.schema().columns() {
508            if !table_columns.contains(column.name()) {
509                self.restore_dirty_windows_after_failure(&new_query);
510                return InvalidQuerySnafu {
511                    reason: format!(
512                        "Column {} not found in sink table with columns {:?}",
513                        column, table_columns
514                    ),
515                }
516                .fail();
517            }
518        }
519
520        let table_provider = Arc::new(DfTableProviderAdapter::new(table));
521        let table_source = Arc::new(DefaultTableSource::new(table_provider));
522
523        // update_at& time index placeholder (if exists) should have default value
524        let plan = LogicalPlan::Dml(DmlStatement::new(
525            datafusion_common::TableReference::Full {
526                catalog: self.config.sink_table_name[0].clone().into(),
527                schema: self.config.sink_table_name[1].clone().into(),
528                table: self.config.sink_table_name[2].clone().into(),
529            },
530            table_source,
531            WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
532            Arc::new(new_query.plan.clone()),
533        ));
534        let insert_into_info = PlanInfo {
535            plan,
536            dirty_restore: new_query.dirty_restore,
537            coverage: new_query.coverage,
538        };
539        let insert_into =
540            match insert_into_info
541                .plan
542                .clone()
543                .recompute_schema()
544                .context(DatafusionSnafu {
545                    context: "Failed to recompute schema",
546                }) {
547                Ok(insert_into) => insert_into,
548                Err(err) => {
549                    self.restore_dirty_windows_after_failure(&insert_into_info);
550                    return Err(err);
551                }
552            };
553
554        Ok(Some(PlanInfo {
555            plan: insert_into,
556            dirty_restore: insert_into_info.dirty_restore,
557            coverage: insert_into_info.coverage,
558        }))
559    }
560
561    pub async fn create_table(
562        &self,
563        frontend_client: &Arc<FrontendClient>,
564        expr: CreateTableExpr,
565    ) -> Result<(), Error> {
566        let catalog = &self.config.sink_table_name[0];
567        let schema = &self.config.sink_table_name[1];
568        frontend_client
569            .create(expr.clone(), catalog, schema)
570            .await?;
571        Ok(())
572    }
573
574    /// Executes the insert plan. Caller must reach this through the serialized path.
575    async fn execute_logical_plan_unlocked(
576        &self,
577        frontend_client: &Arc<FrontendClient>,
578        plan: &LogicalPlan,
579        dirty_restore: &DirtyRestore,
580        coverage: &QueryCoverage,
581    ) -> Result<Option<(usize, Duration)>, Error> {
582        let instant = Instant::now();
583        let flow_id = self.config.flow_id;
584
585        debug!(
586            "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
587            self.config.expire_after, &plan
588        );
589
590        let catalog = &self.config.sink_table_name[0];
591        let schema = &self.config.sink_table_name[1];
592
593        // fix all table ref by make it fully qualified, i.e. "table_name" => "catalog_name.schema_name.table_name"
594        let plan = plan
595            .clone()
596            .transform_down_with_subqueries(|p| {
597                if let LogicalPlan::TableScan(mut table_scan) = p {
598                    let resolved = table_scan.table_name.resolve(catalog, schema);
599                    table_scan.table_name = resolved.into();
600                    Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
601                } else {
602                    Ok(Transformed::no(p))
603                }
604            })
605            .with_context(|_| DatafusionSnafu {
606                context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
607            })?
608            .data;
609
610        // For incremental-mode SQL queries, attempt to rewrite the delta aggregate
611        // plan into a safe delta-LEFT-JOIN-sink form before deciding on extensions.
612        let incremental_plan = if coverage.is_incremental_delta() {
613            self.prepare_plan_for_incremental(&plan).await?
614        } else {
615            None
616        };
617        let incremental_safe = incremental_plan.is_some();
618        if coverage.is_incremental_delta() && !incremental_safe {
619            warn!(
620                "Flow {flow_id} skipped unsafe incremental delta fallback; \
621                 restored dirty signal instead of executing an unfiltered full snapshot"
622            );
623            self.restore_dirty_windows(dirty_restore);
624            return Ok(None);
625        }
626        let plan = incremental_plan.unwrap_or_else(|| plan.clone());
627
628        let extensions = self
629            .build_flow_query_extensions(incremental_safe, coverage.is_incremental_delta())
630            .await?;
631        let frontend_extensions = self.frontend_extensions();
632        let extension_refs = extensions
633            .iter()
634            .map(|(key, value)| (*key, value.as_str()))
635            .chain(
636                frontend_extensions
637                    .iter()
638                    .map(|(key, value)| (key.as_str(), value.as_str())),
639            )
640            .collect::<Vec<_>>();
641        let query_mode = if extensions
642            .iter()
643            .any(|(key, _)| *key == FLOW_INCREMENTAL_MODE)
644        {
645            CheckpointMode::Incremental
646        } else {
647            CheckpointMode::FullSnapshot
648        };
649        Self::record_query_mode(flow_id, query_mode);
650        debug!(
651            "Flow {flow_id} executing batching query with checkpoint_mode={}, extension_count={}",
652            checkpoint_mode_label(query_mode),
653            extensions.len()
654        );
655
656        let mut peer_desc = None;
657        let res = {
658            let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
659                .with_label_values(&[flow_id.to_string().as_str()])
660                .start_timer();
661
662            let req = if let Some((insert_to, insert_input_plan)) =
663                breakup_insert_plan(&plan, catalog, schema)
664            {
665                if query_mode == CheckpointMode::FullSnapshot
666                    && matches!(self.config.query_type, QueryType::Sql)
667                    && self.config.flow_eval_interval.is_some()
668                    && self.config.time_window_expr.is_none()
669                {
670                    // Evaluation-interval SQL flows without a time-window
671                    // expression execute as full-query snapshots. Send these
672                    // as SQL text instead of Substrait to avoid logical-plan
673                    // round-trip issues around complex joins/unions/CTEs and
674                    // duplicate field aliases. Keep ordinary SQL full snapshots
675                    // on the existing InsertIntoPlan path because SQL unparsing
676                    // is not valid for every planned aggregate shape yet.
677                    // If the local SQL unparser does not support this plan,
678                    // keep the previous InsertIntoPlan transport as a fallback.
679                    match df_plan_to_sql(&insert_input_plan) {
680                        Ok(select_sql) => {
681                            let target_columns = format_insert_target_columns(&insert_input_plan);
682                            let sql = format!(
683                                "INSERT INTO {} ({}) {}",
684                                TableReference::full(
685                                    insert_to.catalog_name.as_str(),
686                                    insert_to.schema_name.as_str(),
687                                    insert_to.table_name.as_str(),
688                                )
689                                .to_quoted_string(),
690                                target_columns,
691                                select_sql
692                            );
693                            api::v1::QueryRequest {
694                                query: Some(api::v1::query_request::Query::Sql(sql)),
695                            }
696                        }
697                        Err(err) => {
698                            warn!(
699                                "Failed to unparse full-snapshot SQL flow {} plan; \
700                                 falling back to InsertIntoPlan: {:?}",
701                                flow_id, err
702                            );
703                            encode_insert_plan_request(insert_to, &insert_input_plan)?
704                        }
705                    }
706                } else {
707                    encode_insert_plan_request(insert_to, &insert_input_plan)?
708                }
709            } else {
710                let message = DFLogicalSubstraitConvertor {}
711                    .encode(&plan, DefaultSerializer)
712                    .context(SubstraitEncodeLogicalPlanSnafu)?;
713
714                api::v1::QueryRequest {
715                    query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
716                }
717            };
718
719            let snapshot_seqs = coverage.snapshot_seqs();
720            frontend_client
721                .query_with_terminal_metrics(
722                    catalog,
723                    schema,
724                    req,
725                    &extension_refs,
726                    &snapshot_seqs,
727                    &mut peer_desc,
728                )
729                .await
730        };
731
732        let elapsed = instant.elapsed();
733        let peer_label = peer_desc
734            .as_ref()
735            .map(ToString::to_string)
736            .unwrap_or_else(|| PeerDesc::default().to_string());
737        if let Err(err) = &res {
738            warn!(
739                "Failed to execute Flow {flow_id} on frontend {peer_label}, result: {err:?}, elapsed: {:?} with query: {}",
740                elapsed, &plan
741            );
742            let decision = {
743                let mut state = self.state.write().unwrap();
744                let reason = Self::query_failure_reason(err, coverage);
745                Self::apply_query_failure_to_state(&mut state, elapsed, coverage, reason)
746            };
747            if let Some(decision) = decision {
748                Self::record_checkpoint_decision(flow_id, decision);
749            }
750        }
751
752        // record slow query
753        if elapsed >= self.config.batch_opts.slow_query_threshold {
754            warn!(
755                "Flow {flow_id} on frontend {peer_label} executed for {:?} before complete, query: {}",
756                elapsed, &plan
757            );
758            let flow_id = flow_id.to_string();
759            METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
760                .with_label_values(&[flow_id.as_str(), peer_label.as_str()])
761                .observe(elapsed.as_secs_f64());
762        }
763
764        let res = res?;
765        let (affected_rows, _) = res.output.extract_rows_and_cost();
766        debug!(
767            "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}, watermark: {:?}",
768            elapsed,
769            res.region_watermark_map()
770        );
771        METRIC_FLOW_ROWS
772            .with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
773            .inc_by(affected_rows as _);
774        let decision = {
775            let mut state = self.state.write().unwrap();
776            Self::apply_query_result_to_state(&mut state, &res, elapsed, coverage)
777        };
778        Self::record_checkpoint_decision(flow_id, decision);
779
780        Ok(Some((affected_rows, elapsed)))
781    }
782
783    /// Restore dirty windows consumed by a failed query so they are retried on
784    /// the next execution.
785    ///
786    fn restore_dirty_windows(&self, dirty_restore: &DirtyRestore) {
787        match dirty_restore {
788            DirtyRestore::Scoped(filter) => self.restore_scoped_dirty_windows(filter),
789            DirtyRestore::Unscoped(dirty_windows) => self
790                .state
791                .write()
792                .unwrap()
793                .dirty_time_windows
794                .add_dirty_windows(dirty_windows),
795        }
796    }
797
798    /// Restore the dirty signal for a plan that was generated but failed before
799    /// it could prove any checkpoint advancement.
800    fn restore_dirty_windows_after_failure(&self, query: &PlanInfo) {
801        self.restore_dirty_windows(&query.dirty_restore);
802    }
803
804    /// Restore scoped windows through `TaskState` so fenced repair can decide
805    /// whether they go back to pending repair or live dirty state.
806    fn restore_scoped_dirty_windows(&self, filter: &FilterExprInfo) {
807        self.state.write().unwrap().restore_scoped_windows(filter);
808    }
809
810    /// Run a fallible scoped operation and restore its consumed windows if plan
811    /// generation/rewrite fails before execution.
812    fn restore_scoped_dirty_windows_on_err<T>(
813        &self,
814        filter: &FilterExprInfo,
815        result: Result<T, Error>,
816    ) -> Result<T, Error> {
817        result.inspect_err(|_| {
818            self.restore_scoped_dirty_windows(filter);
819        })
820    }
821
822    /// Restore an unscoped dirty signal consumed by an explicit full-query or
823    /// incremental-delta plan.
824    fn restore_unscoped_dirty_windows(&self, dirty_windows: &DirtyTimeWindows) {
825        self.state
826            .write()
827            .unwrap()
828            .dirty_time_windows
829            .add_dirty_windows(dirty_windows);
830    }
831
832    /// Run a fallible unscoped operation and restore the dirty signal if it
833    /// fails before a query is executed.
834    fn restore_unscoped_dirty_windows_on_err<T>(
835        &self,
836        dirty_windows: &DirtyTimeWindows,
837        result: Result<T, Error>,
838    ) -> Result<T, Error> {
839        result.inspect_err(|_| {
840            self.restore_unscoped_dirty_windows(dirty_windows);
841        })
842    }
843
844    /// Consume the live dirty signal for an unscoped query while keeping a copy
845    /// that can be restored if planning or execution fails.
846    fn drain_dirty_windows_signal(&self) -> (bool, DirtyTimeWindows) {
847        let mut state = self.state.write().unwrap();
848        let dirty_windows_to_restore = state.dirty_time_windows.clone();
849        let is_dirty = !dirty_windows_to_restore.is_empty();
850        state.dirty_time_windows.clean();
851        (is_dirty, dirty_windows_to_restore)
852    }
853
854    #[allow(clippy::too_many_arguments)]
855    /// Build an unfiltered plan for explicit full-query or incremental-delta
856    /// coverage. Callers pass the consumed dirty signal for failure restoration.
857    async fn gen_unfiltered_plan_info(
858        &self,
859        engine: QueryEngineRef,
860        query_ctx: QueryContextRef,
861        sink_table_schema: Arc<Schema>,
862        primary_key_indices: &[usize],
863        allow_partial: bool,
864        dirty_windows_to_restore: DirtyTimeWindows,
865        retention_filter: Option<(&str, Timestamp, &'static str)>,
866        coverage: QueryCoverage,
867    ) -> Result<PlanInfo, Error> {
868        let mut plan = self.restore_unscoped_dirty_windows_on_err(
869            &dirty_windows_to_restore,
870            gen_plan_with_matching_schema(
871                &self.config.query,
872                query_ctx,
873                engine,
874                sink_table_schema,
875                primary_key_indices,
876                allow_partial,
877            )
878            .await,
879        )?;
880
881        if let Some((col_name, lower_bound, context)) = retention_filter {
882            let lower = self.restore_unscoped_dirty_windows_on_err(
883                &dirty_windows_to_restore,
884                to_df_literal(lower_bound),
885            )?;
886            let retention_filter = col(col_name).gt_eq(lit(lower));
887            let mut add_filter = AddFilterRewriter::new(retention_filter);
888            plan = self.restore_unscoped_dirty_windows_on_err(
889                &dirty_windows_to_restore,
890                plan.clone()
891                    .rewrite(&mut add_filter)
892                    .with_context(|_| DatafusionSnafu {
893                        context: format!(
894                            "Failed to apply {context} expire_after filter to plan:\n {}\n",
895                            plan
896                        ),
897                    })
898                    .map(|rewrite| rewrite.data),
899            )?;
900        }
901
902        Ok(PlanInfo {
903            plan,
904            dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore),
905            coverage,
906        })
907    }
908
909    #[allow(clippy::too_many_arguments)]
910    /// Build an unfiltered plan only when the live dirty signal was present;
911    /// otherwise skip this round without querying.
912    async fn gen_unfiltered_plan_info_if_dirty(
913        &self,
914        engine: QueryEngineRef,
915        query_ctx: QueryContextRef,
916        sink_table_schema: Arc<Schema>,
917        primary_key_indices: &[usize],
918        allow_partial: bool,
919        retention_filter: Option<(&str, Timestamp, &'static str)>,
920        coverage: QueryCoverage,
921    ) -> Result<Option<PlanInfo>, Error> {
922        let (is_dirty, dirty_windows_to_restore) = self.drain_dirty_windows_signal();
923        if !is_dirty {
924            debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
925            return Ok(None);
926        }
927
928        self.gen_unfiltered_plan_info(
929            engine,
930            query_ctx,
931            sink_table_schema,
932            primary_key_indices,
933            allow_partial,
934            dirty_windows_to_restore,
935            retention_filter,
936            coverage,
937        )
938        .await
939        .map(Some)
940    }
941
942    fn handle_executed_query_failure(&self, query: Option<&PlanInfo>) {
943        if let Some(query) = query {
944            self.restore_dirty_windows_after_failure(query);
945        }
946    }
947
948    /// start executing query in a loop, break when receive shutdown signal
949    ///
950    /// any error will be logged when executing query.
951    ///
952    /// Dispatches to:
953    /// - scheduled loop when `flow_eval_interval.is_some()`
954    /// - adaptive dirty-window loop otherwise
955    pub async fn start_executing_loop(
956        &self,
957        engine: QueryEngineRef,
958        frontend_client: Arc<FrontendClient>,
959    ) {
960        if self.config.flow_eval_interval.is_some() {
961            self.start_scheduled_loop(engine, frontend_client).await;
962        } else {
963            self.start_adaptive_loop(engine, frontend_client).await;
964        }
965    }
966
967    /// Scheduled batching loop for flows with `EVAL INTERVAL`.
968    ///
969    /// Uses the pre-parsed `EvalSchedule` from `TaskConfig` and selects due
970    /// scheduled times using bounded catch-up semantics. Each scheduled time is the
971    /// scheduled evaluation time used as logical `now()` for that attempt.
972    /// Each attempt temporarily sets `flow.scheduled_time_millis` on the
973    /// task's `QueryContext` and executes under the existing `execution_lock`.
974    /// After every attempt (success, no-op, or failure) the in-memory
975    /// cursor advances.
976    async fn start_scheduled_loop(
977        &self,
978        engine: QueryEngineRef,
979        frontend_client: Arc<FrontendClient>,
980    ) {
981        let flow_id_str = self.config.flow_id.to_string();
982
983        let schedule = match &self.config.eval_schedule {
984            Some(s) => s.clone(),
985            None => {
986                let eval_interval_secs = self
987                    .config
988                    .flow_eval_interval
989                    .map(|d| d.as_secs() as i64)
990                    .expect("checked by caller");
991
992                // Fallback: no typed config provided. Compute defaults
993                // anchored at epoch/start=0.
994                match EvalSchedule::from_config(Some(eval_interval_secs), None) {
995                    Ok(Some(s)) => s,
996                    Ok(None) => {
997                        warn!(
998                            "Flow {}: EVAL INTERVAL set but no schedule parsed; exiting loop",
999                            flow_id_str
1000                        );
1001                        return;
1002                    }
1003                    Err(e) => {
1004                        warn!(
1005                            "Flow {}: Failed to parse eval schedule: {}; exiting loop",
1006                            flow_id_str, e
1007                        );
1008                        return;
1009                    }
1010                }
1011            }
1012        };
1013
1014        // Initial cursor is one interval before start so the first due
1015        // scheduled time is `start_secs`.
1016        let mut cursor_secs = schedule.start_secs.saturating_sub(schedule.interval_secs);
1017
1018        info!(
1019            "Flow {}: entering scheduled loop, interval={}s, start={}, anchor={}, policy={:?}, max_runs={}, max_lag={}s",
1020            flow_id_str,
1021            schedule.interval_secs,
1022            schedule.start_secs,
1023            schedule.anchor_secs,
1024            schedule.missed_tick_policy,
1025            schedule.max_runs,
1026            schedule.max_lag_secs,
1027        );
1028
1029        loop {
1030            if self.is_shutdown_signaled() {
1031                break;
1032            }
1033
1034            let wall_now_secs = wall_clock_unix_secs();
1035
1036            let due = match select_due_scheduled_times(&schedule, cursor_secs, wall_now_secs) {
1037                Some(d) => d,
1038                None => {
1039                    warn!(
1040                        "Flow {}: Invalid schedule (interval <= 0), exiting loop",
1041                        flow_id_str
1042                    );
1043                    return;
1044                }
1045            };
1046
1047            if due.scheduled_times_secs.is_empty() {
1048                if due.skipped > 0 {
1049                    warn!(
1050                        "Flow {}: all {} due scheduled times skipped by max-lag, advancing cursor to wall-clock ({wall_now_secs}) to avoid re-skipping",
1051                        flow_id_str, due.skipped
1052                    );
1053                    cursor_secs = wall_now_secs;
1054                    continue;
1055                }
1056
1057                // No due yet — sleep until the next scheduled time.
1058                let next = schedule.next_scheduled_time_after(cursor_secs);
1059                if next <= wall_now_secs {
1060                    // Shouldn't happen given select_due_scheduled_times returned empty,
1061                    // but guard against clock skew / logic error.
1062                    cursor_secs = wall_now_secs;
1063                    continue;
1064                }
1065                let wait_secs = (next - wall_now_secs) as u64;
1066                let wait_dur = Duration::from_secs(wait_secs);
1067                debug!(
1068                    "Flow {}: no due scheduled times, sleeping for {}s until next scheduled time at {}",
1069                    flow_id_str, wait_secs, next
1070                );
1071                tokio::time::sleep(wait_dur).await;
1072                continue;
1073            }
1074
1075            if due.skipped > 0 {
1076                info!(
1077                    "Flow {}: {} due scheduled times, {} skipped (catch-up)",
1078                    flow_id_str,
1079                    due.scheduled_times_secs.len(),
1080                    due.skipped
1081                );
1082            }
1083
1084            // Execute scheduled times oldest → newest.
1085            for scheduled_time_secs in &due.scheduled_times_secs {
1086                if self.is_shutdown_signaled() {
1087                    break;
1088                }
1089
1090                METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
1091                    .with_label_values(&[&flow_id_str])
1092                    .inc();
1093
1094                let outcome = self
1095                    .execute_once_serialized_at_scheduled_time(
1096                        &engine,
1097                        &frontend_client,
1098                        *scheduled_time_secs,
1099                    )
1100                    .await;
1101
1102                // Advance cursor regardless of outcome.
1103                cursor_secs = *scheduled_time_secs;
1104
1105                match outcome.result {
1106                    Ok(Some((rows, elapsed))) => {
1107                        debug!(
1108                            "Flow {}: scheduled time {} completed, rows={}, elapsed={:?}",
1109                            flow_id_str, scheduled_time_secs, rows, elapsed
1110                        );
1111                    }
1112                    Ok(None) => {
1113                        debug!(
1114                            "Flow {}: scheduled time {} produced no query (no dirty signal or no-op)",
1115                            flow_id_str, scheduled_time_secs
1116                        );
1117                    }
1118                    Err(err) => {
1119                        warn!(
1120                            "Flow {}: scheduled time {} failed: {:?}",
1121                            flow_id_str, scheduled_time_secs, err
1122                        );
1123                        METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
1124                            .with_label_values(&[&flow_id_str])
1125                            .inc();
1126                        // Dirty-window restoration is handled by the
1127                        // existing `handle_executed_query_failure` inside
1128                        // `execute_once_unlocked`.
1129                    }
1130                }
1131            }
1132        }
1133    }
1134
1135    /// Existing adaptive dirty-window loop for flows without `EVAL INTERVAL`.
1136    async fn start_adaptive_loop(
1137        &self,
1138        engine: QueryEngineRef,
1139        frontend_client: Arc<FrontendClient>,
1140    ) {
1141        let flow_id_str = self.config.flow_id.to_string();
1142        let mut max_window_cnt = None;
1143        loop {
1144            if self.is_shutdown_signaled() {
1145                break;
1146            }
1147            METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
1148                .with_label_values(&[&flow_id_str])
1149                .inc();
1150
1151            let min_refresh = self.config.batch_opts.experimental_min_refresh_duration;
1152
1153            let outcome = self
1154                .execute_once_serialized_with_outcome(&engine, &frontend_client, max_window_cnt)
1155                .await;
1156
1157            match outcome.result {
1158                Ok(Some(_)) => {
1159                    max_window_cnt = max_window_cnt.map(|cnt| {
1160                        (cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query)
1161                    });
1162
1163                    let sleep_until = {
1164                        let state = self.state.write().unwrap();
1165
1166                        let time_window_size = self
1167                            .config
1168                            .time_window_expr
1169                            .as_ref()
1170                            .and_then(|t| *t.time_window_size());
1171
1172                        let prefer_short_incremental_cadence = state.checkpoint_mode()
1173                            == CheckpointMode::Incremental
1174                            && !state.is_incremental_disabled();
1175
1176                        state.get_next_start_query_time(
1177                            self.config.flow_id,
1178                            &time_window_size,
1179                            min_refresh,
1180                            Some(self.config.batch_opts.query_timeout),
1181                            self.config.batch_opts.experimental_max_filter_num_per_query,
1182                            prefer_short_incremental_cadence,
1183                        )
1184                    };
1185
1186                    tokio::time::sleep_until(sleep_until).await;
1187                }
1188                Ok(None) => {
1189                    debug!(
1190                        "Flow id = {:?} found no new data, sleep for {:?} then continue",
1191                        self.config.flow_id, min_refresh
1192                    );
1193                    tokio::time::sleep(min_refresh).await;
1194                    continue;
1195                }
1196                Err(err) => {
1197                    METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
1198                        .with_label_values(&[&flow_id_str])
1199                        .inc();
1200                    match outcome.new_query {
1201                        Some(query) => {
1202                            common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
1203                            max_window_cnt = Some(1);
1204                        }
1205                        None => {
1206                            common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
1207                        }
1208                    }
1209                    tokio::time::sleep(min_refresh).await;
1210                }
1211            }
1212        }
1213    }
1214
1215    /// Check whether the shutdown signal has been received.
1216    fn is_shutdown_signaled(&self) -> bool {
1217        let mut state = self.state.write().unwrap();
1218        match state.shutdown_rx.try_recv() {
1219            Ok(()) | Err(TryRecvError::Closed) => true,
1220            Err(TryRecvError::Empty) => false,
1221        }
1222    }
1223
1224    /// Execute one scheduled attempt, temporarily setting
1225    /// `flow.scheduled_time_millis` on the task's QueryContext so
1226    /// SQL/TQL `now()` resolves to the logical scheduled time.
1227    ///
1228    /// The extension is removed after the attempt so a later manual
1229    /// `flush_flow` does not reuse a stale scheduled time.
1230    async fn execute_once_serialized_at_scheduled_time(
1231        &self,
1232        engine: &QueryEngineRef,
1233        frontend_client: &Arc<FrontendClient>,
1234        scheduled_time_secs: i64,
1235    ) -> ExecuteOnceOutcome {
1236        let _execution_guard = self.execution_lock.lock().await;
1237
1238        struct QueryContextRestoreGuard {
1239            state: Arc<RwLock<TaskState>>,
1240            old_ctx: Option<QueryContextRef>,
1241        }
1242
1243        impl Drop for QueryContextRestoreGuard {
1244            fn drop(&mut self) {
1245                let Some(old_ctx) = self.old_ctx.take() else {
1246                    return;
1247                };
1248                if let Ok(mut state) = self.state.write() {
1249                    state.query_ctx = old_ctx;
1250                }
1251            }
1252        }
1253
1254        // Clone the current QueryContext and add the scheduled time
1255        // extension, then swap it into the task state for this attempt.
1256        let old_ctx = {
1257            let mut state = self.state.write().unwrap();
1258            let old = state.query_ctx.clone();
1259            let mut new_ctx = (*old).clone();
1260            new_ctx.set_extension(
1261                query::options::FLOW_SCHEDULED_TIME_MILLIS,
1262                (scheduled_time_secs.saturating_mul(1000)).to_string(),
1263            );
1264            state.query_ctx = Arc::new(new_ctx);
1265            old
1266        };
1267        let restore_guard = QueryContextRestoreGuard {
1268            state: self.state.clone(),
1269            old_ctx: Some(old_ctx),
1270        };
1271
1272        let outcome = self
1273            .execute_once_unlocked(engine, frontend_client, None)
1274            .await;
1275
1276        // Restore while still holding `execution_lock` so no future manual
1277        // flush can observe the temporary scheduled time. The guard also
1278        // restores during unwind/cancellation.
1279        drop(restore_guard);
1280
1281        outcome
1282    }
1283
1284    /// Generate the create table SQL
1285    ///
1286    /// the auto created table will automatically added a `update_at` Milliseconds DEFAULT now() column in the end
1287    /// (for compatibility with flow streaming mode)
1288    ///
1289    /// and it will use first timestamp column as time index, all other columns will be added as normal columns and nullable
1290    async fn gen_create_table_expr(
1291        &self,
1292        engine: QueryEngineRef,
1293    ) -> Result<CreateTableExpr, Error> {
1294        let query_ctx = self.state.read().unwrap().query_ctx.clone();
1295        let plan =
1296            sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
1297        create_table_with_expr(&plan, &self.config.sink_table_name, &self.config.query_type)
1298    }
1299
1300    /// Incremental delta scans are unfiltered by dirty windows; the sequence
1301    /// range, not a time predicate, defines source correctness.
1302    fn should_use_unfiltered_incremental_delta(&self) -> bool {
1303        let state = self.state.read().unwrap();
1304        state.checkpoint_mode() == CheckpointMode::Incremental
1305            && !state.is_incremental_disabled()
1306            && matches!(self.config.query_type, QueryType::Sql)
1307    }
1308
1309    /// Generate the next plan and classify its coverage so checkpoint handling
1310    /// knows whether it is full-query, scoped repair, fenced repair, or delta.
1311    async fn gen_query_with_time_window(
1312        &self,
1313        engine: QueryEngineRef,
1314        sink_table_schema: &Arc<Schema>,
1315        primary_key_indices: &[usize],
1316        allow_partial: bool,
1317        max_window_cnt: Option<usize>,
1318    ) -> Result<Option<PlanInfo>, Error> {
1319        let query_ctx = self.state.read().unwrap().query_ctx.clone();
1320        let start = SystemTime::now();
1321        let since_the_epoch = start
1322            .duration_since(UNIX_EPOCH)
1323            .expect("Time went backwards");
1324        let low_bound = self
1325            .config
1326            .expire_after
1327            .map(|e| since_the_epoch.as_secs() - e as u64)
1328            .unwrap_or(u64::MIN);
1329
1330        let low_bound = Timestamp::new_second(low_bound as i64);
1331
1332        let expire_time_window_bound = self
1333            .config
1334            .time_window_expr
1335            .as_ref()
1336            .map(|expr| expr.eval(low_bound))
1337            .transpose()?;
1338
1339        let (expire_lower_bound, expire_upper_bound) = match (
1340            expire_time_window_bound,
1341            &self.config.query_type,
1342        ) {
1343            (Some((Some(l), Some(u))), QueryType::Sql) => (l, u),
1344            (None, QueryType::Sql) if self.config.flow_eval_interval.is_none() => {
1345                return UnexpectedSnafu {
1346                    reason: format!(
1347                        "Flow id={} reached execution without a time-window expression or EVAL INTERVAL; create-flow validation should have rejected it",
1348                        self.config.flow_id
1349                    ),
1350                }
1351                .fail();
1352            }
1353            _ => {
1354                // Explicit full-query flows (TQL and evaluation-interval SQL
1355                // plans whose shape cannot be safely dirty-window pruned) are
1356                // allowed to run as unfiltered full snapshots. This is distinct
1357                // from using unfiltered full as a fallback after scoped repair or
1358                // incremental rewrite failed.
1359                let (_, dirty_windows_to_restore) = self.drain_dirty_windows_signal();
1360
1361                let plan_info = self
1362                    .gen_unfiltered_plan_info(
1363                        engine,
1364                        query_ctx,
1365                        sink_table_schema.clone(),
1366                        primary_key_indices,
1367                        allow_partial,
1368                        dirty_windows_to_restore,
1369                        None,
1370                        QueryCoverage::UnfilteredFull,
1371                    )
1372                    .await?;
1373
1374                return Ok(Some(plan_info));
1375            }
1376        };
1377
1378        debug!(
1379            "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
1380            self.config.flow_id,
1381            expire_lower_bound,
1382            expire_upper_bound,
1383            self.state.read().unwrap().dirty_time_windows
1384        );
1385        let window_size = expire_upper_bound
1386            .sub(&expire_lower_bound)
1387            .with_context(|| UnexpectedSnafu {
1388                reason: format!(
1389                    "Can't get window size from {expire_upper_bound:?} - {expire_lower_bound:?}"
1390                ),
1391            })?;
1392        let col_name = self
1393            .config
1394            .time_window_expr
1395            .as_ref()
1396            .map(|expr| expr.column_name.clone())
1397            .with_context(|| UnexpectedSnafu {
1398                reason: format!(
1399                    "Flow id={:?}, Failed to get column name from time window expr",
1400                    self.config.flow_id
1401                ),
1402            })?;
1403
1404        if self.should_use_unfiltered_incremental_delta() {
1405            // In incremental mode, source correctness is defined by the
1406            // per-region sequence range `(checkpoint, scan-open snapshot]`, not
1407            // by dirty-window predicates. Dirty windows are only a scheduling
1408            // signal here. Applying a stale dirty-window filter to the source can
1409            // exclude rows that are inside the returned watermark and make a
1410            // checkpoint advance skip them forever. The sink side is also left
1411            // unfiltered by dirty windows; the incremental rewrite joins the
1412            // delta groups with the full sink state for correctness. Future
1413            // dynamic filters can prune sink reads as a pure optimization.
1414            let retention_filter = self
1415                .config
1416                .expire_after
1417                .map(|_| (col_name.as_str(), expire_lower_bound, "incremental"));
1418            return self
1419                .gen_unfiltered_plan_info_if_dirty(
1420                    engine,
1421                    query_ctx,
1422                    sink_table_schema.clone(),
1423                    primary_key_indices,
1424                    allow_partial,
1425                    retention_filter,
1426                    QueryCoverage::IncrementalDelta,
1427                )
1428                .await;
1429        }
1430
1431        let (expr, coverage) = {
1432            let mut state = self.state.write().unwrap();
1433            let window_cnt = max_window_cnt
1434                .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query);
1435            let expr = state.gen_scoped_filter_exprs(
1436                &col_name,
1437                Some(expire_lower_bound),
1438                window_size,
1439                window_cnt,
1440                self.config.flow_id,
1441                Some(self),
1442            )?;
1443            let repair_high = state
1444                .pending_fenced_repair()
1445                .map(|repair| repair.high().clone());
1446            let coverage = if let Some(high) = repair_high {
1447                QueryCoverage::FencedRepairChunk { high }
1448            } else {
1449                QueryCoverage::ScopedBaseRepair
1450            };
1451            (expr, coverage)
1452        };
1453
1454        let Some(expr) = expr else {
1455            // no new data, hence no need to update
1456            debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
1457            return Ok(None);
1458        };
1459
1460        let filter_sql = expr_to_sql(&expr.expr)
1461            .map(|sql| sql.to_string())
1462            .unwrap_or_else(|err| format!("<failed to format filter expr: {err}>"));
1463
1464        debug!(
1465            "Flow id={:?}, Generated filter expr: {:?}",
1466            self.config.flow_id, filter_sql
1467        );
1468
1469        let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
1470        let mut add_auto_column = ColumnMatcherRewriter::new(
1471            sink_table_schema.clone(),
1472            primary_key_indices.to_vec(),
1473            allow_partial,
1474        );
1475
1476        let plan = self.restore_scoped_dirty_windows_on_err(
1477            &expr,
1478            sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await,
1479        )?;
1480        let rewrite = self.restore_scoped_dirty_windows_on_err(
1481            &expr,
1482            plan.clone()
1483                .rewrite(&mut add_filter)
1484                .and_then(|p| p.data.rewrite(&mut add_auto_column))
1485                .with_context(|_| DatafusionSnafu {
1486                    context: format!("Failed to rewrite plan:\n {}\n", plan),
1487                })
1488                .map(|rewrite| rewrite.data),
1489        )?;
1490        // only apply optimize after complex rewrite is done
1491        let new_plan = self.restore_scoped_dirty_windows_on_err(
1492            &expr,
1493            apply_df_optimizer(rewrite, &query_ctx).await,
1494        )?;
1495
1496        let info = PlanInfo {
1497            plan: new_plan.clone(),
1498            dirty_restore: DirtyRestore::Scoped(expr),
1499            coverage,
1500        };
1501
1502        Ok(Some(info))
1503    }
1504}
1505
1506#[cfg(test)]
1507mod test;