1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18
19use api::v1::{CreateTableExpr, TableName};
20use catalog::CatalogManagerRef;
21use common_error::ext::BoxedError;
22use common_query::logical_plan::breakup_insert_plan;
23use common_telemetry::tracing::warn;
24use common_telemetry::{debug, info};
25use common_time::Timestamp;
26use datafusion::datasource::DefaultTableSource;
27use datafusion::sql::unparser::expr_to_sql;
28use datafusion_common::tree_node::{Transformed, TreeNode};
29use datafusion_common::utils::quote_identifier;
30use datafusion_common::{DFSchemaRef, TableReference};
31use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp, col, lit};
32use datatypes::schema::Schema;
33use query::QueryEngineRef;
34use query::options::FLOW_INCREMENTAL_MODE;
35use query::query_engine::DefaultSerializer;
36use session::context::QueryContextRef;
37use snafu::{OptionExt, ResultExt};
38use sql::parsers::utils::is_tql;
39use store_api::mito_engine_options::MERGE_MODE_KEY;
40use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
41use table::table::adapter::DfTableProviderAdapter;
42use tokio::sync::oneshot::error::TryRecvError;
43use tokio::sync::{Mutex, oneshot};
44use tokio::time::Instant;
45
46use crate::batching_mode::BatchingModeOptions;
47use crate::batching_mode::checkpoint::checkpoint_mode_label;
48use crate::batching_mode::eval_schedule::{EvalSchedule, select_due_scheduled_times};
49use crate::batching_mode::frontend_client::{FrontendClient, PeerDesc};
50use crate::batching_mode::state::{
51 CheckpointMode, DirtyTimeWindows, FilterExprInfo, TaskState, to_df_literal,
52};
53use crate::batching_mode::table_creator::{QueryType, create_table_with_expr};
54use crate::batching_mode::time_window::TimeWindowExpr;
55use crate::batching_mode::utils::{
56 AddFilterRewriter, ColumnMatcherRewriter, df_plan_to_sql, gen_plan_with_matching_schema,
57 get_table_info_df_schema, sql_to_df_plan,
58};
59use crate::df_optimizer::apply_df_optimizer;
60use crate::error::{
61 DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu, SubstraitEncodeLogicalPlanSnafu,
62 UnexpectedSnafu,
63};
64use crate::metrics::{
65 METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
66 METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
67 METRIC_FLOW_ROWS,
68};
69use crate::{Error, FlowId};
70
71mod ckpt;
72mod inc;
73
74fn wall_clock_unix_secs() -> i64 {
76 SystemTime::now()
77 .duration_since(UNIX_EPOCH)
78 .unwrap_or_default()
79 .as_secs() as i64
80}
81
82#[derive(Clone)]
84pub struct TaskConfig {
85 pub flow_id: FlowId,
86 pub query: String,
87 pub output_schema: DFSchemaRef,
89 pub time_window_expr: Option<TimeWindowExpr>,
90 pub expire_after: Option<i64>,
92 pub sink_table_name: [String; 3],
93 pub source_table_names: HashSet<[String; 3]>,
94 pub catalog_manager: CatalogManagerRef,
95 pub query_type: QueryType,
96 pub batch_opts: Arc<BatchingModeOptions>,
97 pub flow_eval_interval: Option<Duration>,
98 pub eval_schedule: Option<EvalSchedule>,
100}
101
102fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
103 let is_tql = is_tql(query_ctx.sql_dialect(), query)
104 .map_err(BoxedError::new)
105 .context(ExternalSnafu)?;
106 Ok(if is_tql {
107 QueryType::Tql
108 } else {
109 QueryType::Sql
110 })
111}
112
113fn is_merge_mode_last_non_null(options: &HashMap<String, String>) -> bool {
114 options
115 .get(MERGE_MODE_KEY)
116 .map(|mode| mode.eq_ignore_ascii_case("last_non_null"))
117 .unwrap_or(false)
118}
119
120fn encode_insert_plan_request(
121 insert_to: TableName,
122 insert_input_plan: &LogicalPlan,
123) -> Result<api::v1::QueryRequest, Error> {
124 let message = DFLogicalSubstraitConvertor {}
125 .encode(insert_input_plan, DefaultSerializer)
126 .context(SubstraitEncodeLogicalPlanSnafu)?;
127 Ok(api::v1::QueryRequest {
128 query: Some(api::v1::query_request::Query::InsertIntoPlan(
129 api::v1::InsertIntoPlan {
130 table_name: Some(insert_to),
131 logical_plan: message.to_vec(),
132 },
133 )),
134 })
135}
136
137fn format_insert_target_columns(plan: &LogicalPlan) -> String {
138 plan.schema()
139 .fields()
140 .iter()
141 .map(|field| quote_identifier(field.name()).to_string())
142 .collect::<Vec<_>>()
143 .join(", ")
144}
145
146#[derive(Clone)]
147pub struct BatchingTask {
148 pub config: Arc<TaskConfig>,
149 pub state: Arc<RwLock<TaskState>>,
150 execution_lock: Arc<Mutex<()>>,
154}
155
156pub struct TaskArgs<'a> {
158 pub flow_id: FlowId,
159 pub query: &'a str,
160 pub plan: LogicalPlan,
161 pub time_window_expr: Option<TimeWindowExpr>,
162 pub expire_after: Option<i64>,
163 pub sink_table_name: [String; 3],
164 pub source_table_names: Vec<[String; 3]>,
165 pub query_ctx: QueryContextRef,
166 pub catalog_manager: CatalogManagerRef,
167 pub shutdown_rx: oneshot::Receiver<()>,
168 pub batch_opts: Arc<BatchingModeOptions>,
169 pub flow_eval_interval: Option<Duration>,
170 pub eval_schedule: Option<EvalSchedule>,
172}
173
174pub struct PlanInfo {
175 pub plan: LogicalPlan,
176 pub dirty_restore: DirtyRestore,
177 pub coverage: QueryCoverage,
178}
179
180#[derive(Clone)]
181pub enum QueryCoverage {
182 UnfilteredFull,
187 ScopedBaseRepair,
190 FencedRepairChunk { high: BTreeMap<u64, u64> },
194 IncrementalDelta,
196}
197
198impl QueryCoverage {
199 fn is_incremental_delta(&self) -> bool {
202 matches!(self, Self::IncrementalDelta)
203 }
204
205 fn snapshot_seqs(&self) -> HashMap<u64, u64> {
208 match self {
209 Self::FencedRepairChunk { high } => high.iter().map(|(k, v)| (*k, *v)).collect(),
210 _ => HashMap::new(),
211 }
212 }
213}
214
215pub enum DirtyRestore {
216 Scoped(FilterExprInfo),
219 Unscoped(DirtyTimeWindows),
226}
227
228struct ExecuteOnceOutcome {
229 new_query: Option<PlanInfo>,
230 result: Result<Option<(usize, Duration)>, Error>,
236}
237
238impl BatchingTask {
239 #[allow(clippy::too_many_arguments)]
240 pub fn try_new(
241 TaskArgs {
242 flow_id,
243 query,
244 plan,
245 time_window_expr,
246 expire_after,
247 sink_table_name,
248 source_table_names,
249 query_ctx,
250 catalog_manager,
251 shutdown_rx,
252 batch_opts,
253 flow_eval_interval,
254 eval_schedule,
255 }: TaskArgs<'_>,
256 ) -> Result<Self, Error> {
257 let mut state = TaskState::with_dirty_time_windows(
258 query_ctx.clone(),
259 shutdown_rx,
260 DirtyTimeWindows::new(
261 batch_opts.experimental_max_filter_num_per_query,
262 batch_opts.experimental_time_window_merge_threshold,
263 ),
264 );
265 if !batch_opts.experimental_enable_incremental_read {
266 state.disable_incremental();
267 }
268
269 Ok(Self {
270 config: Arc::new(TaskConfig {
271 flow_id,
272 query: query.to_string(),
273 time_window_expr,
274 expire_after,
275 sink_table_name,
276 source_table_names: source_table_names.into_iter().collect(),
277 catalog_manager,
278 output_schema: plan.schema().clone(),
279 query_type: determine_query_type(query, &query_ctx)?,
280 batch_opts,
281 flow_eval_interval,
282 eval_schedule,
283 }),
284 state: Arc::new(RwLock::new(state)),
285 execution_lock: Arc::new(Mutex::new(())),
286 })
287 }
288
289 pub fn last_execution_time_millis(&self) -> Option<i64> {
290 self.state.read().unwrap().last_execution_time_millis()
291 }
292
293 fn frontend_extensions(&self) -> HashMap<String, String> {
296 let ctx = self.state.read().unwrap();
297 let all = ctx.query_ctx.extensions();
298 let mut flow_exts = HashMap::new();
299 if let Some(v) = all.get(query::options::FLOW_SCHEDULED_TIME_MILLIS) {
302 flow_exts.insert(
303 query::options::FLOW_SCHEDULED_TIME_MILLIS.to_string(),
304 v.clone(),
305 );
306 }
307 flow_exts
308 }
309
310 pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
314 let now = SystemTime::now();
315 let now = Timestamp::new_second(
316 now.duration_since(UNIX_EPOCH)
317 .expect("Time went backwards")
318 .as_secs() as _,
319 );
320 let lower_bound = self
321 .config
322 .expire_after
323 .map(|e| now.sub_duration(Duration::from_secs(e as _)))
324 .transpose()
325 .map_err(BoxedError::new)
326 .context(ExternalSnafu)?
327 .unwrap_or(Timestamp::new_second(0));
328 debug!(
329 "Flow {} mark range ({:?}, {:?}) as dirty",
330 self.config.flow_id, lower_bound, now
331 );
332 self.state
333 .write()
334 .unwrap()
335 .dirty_time_windows
336 .add_window(lower_bound, Some(now));
337 Ok(())
338 }
339
340 pub async fn check_or_create_sink_table(
342 &self,
343 engine: &QueryEngineRef,
344 frontend_client: &Arc<FrontendClient>,
345 ) -> Result<Option<(usize, Duration)>, Error> {
346 if !self.is_table_exist(&self.config.sink_table_name).await? {
347 let create_table = self.gen_create_table_expr(engine.clone()).await?;
348 info!(
349 "Try creating sink table(if not exists) with expr: {:?}",
350 create_table
351 );
352 self.create_table(frontend_client, create_table).await?;
353 info!(
354 "Sink table {}(if not exists) created",
355 self.config.sink_table_name.join(".")
356 );
357 }
358
359 Ok(None)
360 }
361
362 pub async fn validate_sink_table_schema(&self, engine: &QueryEngineRef) -> Result<(), Error> {
368 let (table, _) = get_table_info_df_schema(
369 self.config.catalog_manager.clone(),
370 self.config.sink_table_name.clone(),
371 )
372 .await?;
373
374 let table_meta = &table.table_info().meta;
375 let merge_mode_last_non_null =
376 is_merge_mode_last_non_null(&table_meta.options.extra_options);
377 let primary_key_indices = table_meta.primary_key_indices.clone();
378 let query_ctx = self.state.read().unwrap().query_ctx.clone();
379
380 gen_plan_with_matching_schema(
381 &self.config.query,
382 query_ctx,
383 engine.clone(),
384 table_meta.schema.clone(),
385 &primary_key_indices,
386 merge_mode_last_non_null,
387 )
388 .await
389 .map(|_| ())
390 }
391
392 async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
393 self.config
394 .catalog_manager
395 .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
396 .await
397 .map_err(BoxedError::new)
398 .context(ExternalSnafu)
399 }
400
401 pub(crate) async fn execute_once_serialized(
402 &self,
403 engine: &QueryEngineRef,
404 frontend_client: &Arc<FrontendClient>,
405 max_window_cnt: Option<usize>,
406 ) -> Result<Option<(usize, Duration)>, Error> {
407 let outcome = self
408 .execute_once_serialized_with_outcome(engine, frontend_client, max_window_cnt)
409 .await;
410 outcome.result
411 }
412
413 async fn execute_once_serialized_with_outcome(
416 &self,
417 engine: &QueryEngineRef,
418 frontend_client: &Arc<FrontendClient>,
419 max_window_cnt: Option<usize>,
420 ) -> ExecuteOnceOutcome {
421 let _execution_guard = self.execution_lock.lock().await;
422 self.execute_once_unlocked(engine, frontend_client, max_window_cnt)
423 .await
424 }
425
426 async fn execute_once_unlocked(
428 &self,
429 engine: &QueryEngineRef,
430 frontend_client: &Arc<FrontendClient>,
431 max_window_cnt: Option<usize>,
432 ) -> ExecuteOnceOutcome {
433 let new_query = match self.gen_insert_plan_unlocked(engine, max_window_cnt).await {
434 Ok(new_query) => new_query,
435 Err(err) => {
436 return ExecuteOnceOutcome {
437 new_query: None,
438 result: Err(err),
439 };
440 }
441 };
442
443 if let Some(new_query) = new_query {
444 debug!("Generate new query: {}", new_query.plan);
445 let res = self
446 .execute_logical_plan_unlocked(
447 frontend_client,
448 &new_query.plan,
449 &new_query.dirty_restore,
450 &new_query.coverage,
451 )
452 .await;
453 if res.is_err() {
454 self.handle_executed_query_failure(Some(&new_query));
455 }
456 ExecuteOnceOutcome {
457 new_query: Some(new_query),
458 result: res,
459 }
460 } else {
461 debug!("Generate no query");
462 ExecuteOnceOutcome {
463 new_query: None,
464 result: Ok(None),
465 }
466 }
467 }
468
469 async fn gen_insert_plan_unlocked(
471 &self,
472 engine: &QueryEngineRef,
473 max_window_cnt: Option<usize>,
474 ) -> Result<Option<PlanInfo>, Error> {
475 let (table, df_schema) = get_table_info_df_schema(
476 self.config.catalog_manager.clone(),
477 self.config.sink_table_name.clone(),
478 )
479 .await?;
480
481 let table_meta = &table.table_info().meta;
482 let merge_mode_last_non_null =
483 is_merge_mode_last_non_null(&table_meta.options.extra_options);
484 let primary_key_indices = table_meta.primary_key_indices.clone();
485
486 let new_query = self
487 .gen_query_with_time_window(
488 engine.clone(),
489 &table.table_info().meta.schema,
490 &primary_key_indices,
491 merge_mode_last_non_null,
492 max_window_cnt,
493 )
494 .await?;
495
496 let Some(new_query) = new_query else {
497 return Ok(None);
498 };
499
500 let table_columns = df_schema
503 .columns()
504 .into_iter()
505 .map(|c| c.name)
506 .collect::<BTreeSet<_>>();
507 for column in new_query.plan.schema().columns() {
508 if !table_columns.contains(column.name()) {
509 self.restore_dirty_windows_after_failure(&new_query);
510 return InvalidQuerySnafu {
511 reason: format!(
512 "Column {} not found in sink table with columns {:?}",
513 column, table_columns
514 ),
515 }
516 .fail();
517 }
518 }
519
520 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
521 let table_source = Arc::new(DefaultTableSource::new(table_provider));
522
523 let plan = LogicalPlan::Dml(DmlStatement::new(
525 datafusion_common::TableReference::Full {
526 catalog: self.config.sink_table_name[0].clone().into(),
527 schema: self.config.sink_table_name[1].clone().into(),
528 table: self.config.sink_table_name[2].clone().into(),
529 },
530 table_source,
531 WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
532 Arc::new(new_query.plan.clone()),
533 ));
534 let insert_into_info = PlanInfo {
535 plan,
536 dirty_restore: new_query.dirty_restore,
537 coverage: new_query.coverage,
538 };
539 let insert_into =
540 match insert_into_info
541 .plan
542 .clone()
543 .recompute_schema()
544 .context(DatafusionSnafu {
545 context: "Failed to recompute schema",
546 }) {
547 Ok(insert_into) => insert_into,
548 Err(err) => {
549 self.restore_dirty_windows_after_failure(&insert_into_info);
550 return Err(err);
551 }
552 };
553
554 Ok(Some(PlanInfo {
555 plan: insert_into,
556 dirty_restore: insert_into_info.dirty_restore,
557 coverage: insert_into_info.coverage,
558 }))
559 }
560
561 pub async fn create_table(
562 &self,
563 frontend_client: &Arc<FrontendClient>,
564 expr: CreateTableExpr,
565 ) -> Result<(), Error> {
566 let catalog = &self.config.sink_table_name[0];
567 let schema = &self.config.sink_table_name[1];
568 frontend_client
569 .create(expr.clone(), catalog, schema)
570 .await?;
571 Ok(())
572 }
573
574 async fn execute_logical_plan_unlocked(
576 &self,
577 frontend_client: &Arc<FrontendClient>,
578 plan: &LogicalPlan,
579 dirty_restore: &DirtyRestore,
580 coverage: &QueryCoverage,
581 ) -> Result<Option<(usize, Duration)>, Error> {
582 let instant = Instant::now();
583 let flow_id = self.config.flow_id;
584
585 debug!(
586 "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
587 self.config.expire_after, &plan
588 );
589
590 let catalog = &self.config.sink_table_name[0];
591 let schema = &self.config.sink_table_name[1];
592
593 let plan = plan
595 .clone()
596 .transform_down_with_subqueries(|p| {
597 if let LogicalPlan::TableScan(mut table_scan) = p {
598 let resolved = table_scan.table_name.resolve(catalog, schema);
599 table_scan.table_name = resolved.into();
600 Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
601 } else {
602 Ok(Transformed::no(p))
603 }
604 })
605 .with_context(|_| DatafusionSnafu {
606 context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
607 })?
608 .data;
609
610 let incremental_plan = if coverage.is_incremental_delta() {
613 self.prepare_plan_for_incremental(&plan).await?
614 } else {
615 None
616 };
617 let incremental_safe = incremental_plan.is_some();
618 if coverage.is_incremental_delta() && !incremental_safe {
619 warn!(
620 "Flow {flow_id} skipped unsafe incremental delta fallback; \
621 restored dirty signal instead of executing an unfiltered full snapshot"
622 );
623 self.restore_dirty_windows(dirty_restore);
624 return Ok(None);
625 }
626 let plan = incremental_plan.unwrap_or_else(|| plan.clone());
627
628 let extensions = self
629 .build_flow_query_extensions(incremental_safe, coverage.is_incremental_delta())
630 .await?;
631 let frontend_extensions = self.frontend_extensions();
632 let extension_refs = extensions
633 .iter()
634 .map(|(key, value)| (*key, value.as_str()))
635 .chain(
636 frontend_extensions
637 .iter()
638 .map(|(key, value)| (key.as_str(), value.as_str())),
639 )
640 .collect::<Vec<_>>();
641 let query_mode = if extensions
642 .iter()
643 .any(|(key, _)| *key == FLOW_INCREMENTAL_MODE)
644 {
645 CheckpointMode::Incremental
646 } else {
647 CheckpointMode::FullSnapshot
648 };
649 Self::record_query_mode(flow_id, query_mode);
650 debug!(
651 "Flow {flow_id} executing batching query with checkpoint_mode={}, extension_count={}",
652 checkpoint_mode_label(query_mode),
653 extensions.len()
654 );
655
656 let mut peer_desc = None;
657 let res = {
658 let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
659 .with_label_values(&[flow_id.to_string().as_str()])
660 .start_timer();
661
662 let req = if let Some((insert_to, insert_input_plan)) =
663 breakup_insert_plan(&plan, catalog, schema)
664 {
665 if query_mode == CheckpointMode::FullSnapshot
666 && matches!(self.config.query_type, QueryType::Sql)
667 && self.config.flow_eval_interval.is_some()
668 && self.config.time_window_expr.is_none()
669 {
670 match df_plan_to_sql(&insert_input_plan) {
680 Ok(select_sql) => {
681 let target_columns = format_insert_target_columns(&insert_input_plan);
682 let sql = format!(
683 "INSERT INTO {} ({}) {}",
684 TableReference::full(
685 insert_to.catalog_name.as_str(),
686 insert_to.schema_name.as_str(),
687 insert_to.table_name.as_str(),
688 )
689 .to_quoted_string(),
690 target_columns,
691 select_sql
692 );
693 api::v1::QueryRequest {
694 query: Some(api::v1::query_request::Query::Sql(sql)),
695 }
696 }
697 Err(err) => {
698 warn!(
699 "Failed to unparse full-snapshot SQL flow {} plan; \
700 falling back to InsertIntoPlan: {:?}",
701 flow_id, err
702 );
703 encode_insert_plan_request(insert_to, &insert_input_plan)?
704 }
705 }
706 } else {
707 encode_insert_plan_request(insert_to, &insert_input_plan)?
708 }
709 } else {
710 let message = DFLogicalSubstraitConvertor {}
711 .encode(&plan, DefaultSerializer)
712 .context(SubstraitEncodeLogicalPlanSnafu)?;
713
714 api::v1::QueryRequest {
715 query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
716 }
717 };
718
719 let snapshot_seqs = coverage.snapshot_seqs();
720 frontend_client
721 .query_with_terminal_metrics(
722 catalog,
723 schema,
724 req,
725 &extension_refs,
726 &snapshot_seqs,
727 &mut peer_desc,
728 )
729 .await
730 };
731
732 let elapsed = instant.elapsed();
733 let peer_label = peer_desc
734 .as_ref()
735 .map(ToString::to_string)
736 .unwrap_or_else(|| PeerDesc::default().to_string());
737 if let Err(err) = &res {
738 warn!(
739 "Failed to execute Flow {flow_id} on frontend {peer_label}, result: {err:?}, elapsed: {:?} with query: {}",
740 elapsed, &plan
741 );
742 let decision = {
743 let mut state = self.state.write().unwrap();
744 let reason = Self::query_failure_reason(err, coverage);
745 Self::apply_query_failure_to_state(&mut state, elapsed, coverage, reason)
746 };
747 if let Some(decision) = decision {
748 Self::record_checkpoint_decision(flow_id, decision);
749 }
750 }
751
752 if elapsed >= self.config.batch_opts.slow_query_threshold {
754 warn!(
755 "Flow {flow_id} on frontend {peer_label} executed for {:?} before complete, query: {}",
756 elapsed, &plan
757 );
758 let flow_id = flow_id.to_string();
759 METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
760 .with_label_values(&[flow_id.as_str(), peer_label.as_str()])
761 .observe(elapsed.as_secs_f64());
762 }
763
764 let res = res?;
765 let (affected_rows, _) = res.output.extract_rows_and_cost();
766 debug!(
767 "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}, watermark: {:?}",
768 elapsed,
769 res.region_watermark_map()
770 );
771 METRIC_FLOW_ROWS
772 .with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
773 .inc_by(affected_rows as _);
774 let decision = {
775 let mut state = self.state.write().unwrap();
776 Self::apply_query_result_to_state(&mut state, &res, elapsed, coverage)
777 };
778 Self::record_checkpoint_decision(flow_id, decision);
779
780 Ok(Some((affected_rows, elapsed)))
781 }
782
783 fn restore_dirty_windows(&self, dirty_restore: &DirtyRestore) {
787 match dirty_restore {
788 DirtyRestore::Scoped(filter) => self.restore_scoped_dirty_windows(filter),
789 DirtyRestore::Unscoped(dirty_windows) => self
790 .state
791 .write()
792 .unwrap()
793 .dirty_time_windows
794 .add_dirty_windows(dirty_windows),
795 }
796 }
797
798 fn restore_dirty_windows_after_failure(&self, query: &PlanInfo) {
801 self.restore_dirty_windows(&query.dirty_restore);
802 }
803
804 fn restore_scoped_dirty_windows(&self, filter: &FilterExprInfo) {
807 self.state.write().unwrap().restore_scoped_windows(filter);
808 }
809
810 fn restore_scoped_dirty_windows_on_err<T>(
813 &self,
814 filter: &FilterExprInfo,
815 result: Result<T, Error>,
816 ) -> Result<T, Error> {
817 result.inspect_err(|_| {
818 self.restore_scoped_dirty_windows(filter);
819 })
820 }
821
822 fn restore_unscoped_dirty_windows(&self, dirty_windows: &DirtyTimeWindows) {
825 self.state
826 .write()
827 .unwrap()
828 .dirty_time_windows
829 .add_dirty_windows(dirty_windows);
830 }
831
832 fn restore_unscoped_dirty_windows_on_err<T>(
835 &self,
836 dirty_windows: &DirtyTimeWindows,
837 result: Result<T, Error>,
838 ) -> Result<T, Error> {
839 result.inspect_err(|_| {
840 self.restore_unscoped_dirty_windows(dirty_windows);
841 })
842 }
843
844 fn drain_dirty_windows_signal(&self) -> (bool, DirtyTimeWindows) {
847 let mut state = self.state.write().unwrap();
848 let dirty_windows_to_restore = state.dirty_time_windows.clone();
849 let is_dirty = !dirty_windows_to_restore.is_empty();
850 state.dirty_time_windows.clean();
851 (is_dirty, dirty_windows_to_restore)
852 }
853
854 #[allow(clippy::too_many_arguments)]
855 async fn gen_unfiltered_plan_info(
858 &self,
859 engine: QueryEngineRef,
860 query_ctx: QueryContextRef,
861 sink_table_schema: Arc<Schema>,
862 primary_key_indices: &[usize],
863 allow_partial: bool,
864 dirty_windows_to_restore: DirtyTimeWindows,
865 retention_filter: Option<(&str, Timestamp, &'static str)>,
866 coverage: QueryCoverage,
867 ) -> Result<PlanInfo, Error> {
868 let mut plan = self.restore_unscoped_dirty_windows_on_err(
869 &dirty_windows_to_restore,
870 gen_plan_with_matching_schema(
871 &self.config.query,
872 query_ctx,
873 engine,
874 sink_table_schema,
875 primary_key_indices,
876 allow_partial,
877 )
878 .await,
879 )?;
880
881 if let Some((col_name, lower_bound, context)) = retention_filter {
882 let lower = self.restore_unscoped_dirty_windows_on_err(
883 &dirty_windows_to_restore,
884 to_df_literal(lower_bound),
885 )?;
886 let retention_filter = col(col_name).gt_eq(lit(lower));
887 let mut add_filter = AddFilterRewriter::new(retention_filter);
888 plan = self.restore_unscoped_dirty_windows_on_err(
889 &dirty_windows_to_restore,
890 plan.clone()
891 .rewrite(&mut add_filter)
892 .with_context(|_| DatafusionSnafu {
893 context: format!(
894 "Failed to apply {context} expire_after filter to plan:\n {}\n",
895 plan
896 ),
897 })
898 .map(|rewrite| rewrite.data),
899 )?;
900 }
901
902 Ok(PlanInfo {
903 plan,
904 dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore),
905 coverage,
906 })
907 }
908
909 #[allow(clippy::too_many_arguments)]
910 async fn gen_unfiltered_plan_info_if_dirty(
913 &self,
914 engine: QueryEngineRef,
915 query_ctx: QueryContextRef,
916 sink_table_schema: Arc<Schema>,
917 primary_key_indices: &[usize],
918 allow_partial: bool,
919 retention_filter: Option<(&str, Timestamp, &'static str)>,
920 coverage: QueryCoverage,
921 ) -> Result<Option<PlanInfo>, Error> {
922 let (is_dirty, dirty_windows_to_restore) = self.drain_dirty_windows_signal();
923 if !is_dirty {
924 debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
925 return Ok(None);
926 }
927
928 self.gen_unfiltered_plan_info(
929 engine,
930 query_ctx,
931 sink_table_schema,
932 primary_key_indices,
933 allow_partial,
934 dirty_windows_to_restore,
935 retention_filter,
936 coverage,
937 )
938 .await
939 .map(Some)
940 }
941
942 fn handle_executed_query_failure(&self, query: Option<&PlanInfo>) {
943 if let Some(query) = query {
944 self.restore_dirty_windows_after_failure(query);
945 }
946 }
947
948 pub async fn start_executing_loop(
956 &self,
957 engine: QueryEngineRef,
958 frontend_client: Arc<FrontendClient>,
959 ) {
960 if self.config.flow_eval_interval.is_some() {
961 self.start_scheduled_loop(engine, frontend_client).await;
962 } else {
963 self.start_adaptive_loop(engine, frontend_client).await;
964 }
965 }
966
967 async fn start_scheduled_loop(
977 &self,
978 engine: QueryEngineRef,
979 frontend_client: Arc<FrontendClient>,
980 ) {
981 let flow_id_str = self.config.flow_id.to_string();
982
983 let schedule = match &self.config.eval_schedule {
984 Some(s) => s.clone(),
985 None => {
986 let eval_interval_secs = self
987 .config
988 .flow_eval_interval
989 .map(|d| d.as_secs() as i64)
990 .expect("checked by caller");
991
992 match EvalSchedule::from_config(Some(eval_interval_secs), None) {
995 Ok(Some(s)) => s,
996 Ok(None) => {
997 warn!(
998 "Flow {}: EVAL INTERVAL set but no schedule parsed; exiting loop",
999 flow_id_str
1000 );
1001 return;
1002 }
1003 Err(e) => {
1004 warn!(
1005 "Flow {}: Failed to parse eval schedule: {}; exiting loop",
1006 flow_id_str, e
1007 );
1008 return;
1009 }
1010 }
1011 }
1012 };
1013
1014 let mut cursor_secs = schedule.start_secs.saturating_sub(schedule.interval_secs);
1017
1018 info!(
1019 "Flow {}: entering scheduled loop, interval={}s, start={}, anchor={}, policy={:?}, max_runs={}, max_lag={}s",
1020 flow_id_str,
1021 schedule.interval_secs,
1022 schedule.start_secs,
1023 schedule.anchor_secs,
1024 schedule.missed_tick_policy,
1025 schedule.max_runs,
1026 schedule.max_lag_secs,
1027 );
1028
1029 loop {
1030 if self.is_shutdown_signaled() {
1031 break;
1032 }
1033
1034 let wall_now_secs = wall_clock_unix_secs();
1035
1036 let due = match select_due_scheduled_times(&schedule, cursor_secs, wall_now_secs) {
1037 Some(d) => d,
1038 None => {
1039 warn!(
1040 "Flow {}: Invalid schedule (interval <= 0), exiting loop",
1041 flow_id_str
1042 );
1043 return;
1044 }
1045 };
1046
1047 if due.scheduled_times_secs.is_empty() {
1048 if due.skipped > 0 {
1049 warn!(
1050 "Flow {}: all {} due scheduled times skipped by max-lag, advancing cursor to wall-clock ({wall_now_secs}) to avoid re-skipping",
1051 flow_id_str, due.skipped
1052 );
1053 cursor_secs = wall_now_secs;
1054 continue;
1055 }
1056
1057 let next = schedule.next_scheduled_time_after(cursor_secs);
1059 if next <= wall_now_secs {
1060 cursor_secs = wall_now_secs;
1063 continue;
1064 }
1065 let wait_secs = (next - wall_now_secs) as u64;
1066 let wait_dur = Duration::from_secs(wait_secs);
1067 debug!(
1068 "Flow {}: no due scheduled times, sleeping for {}s until next scheduled time at {}",
1069 flow_id_str, wait_secs, next
1070 );
1071 tokio::time::sleep(wait_dur).await;
1072 continue;
1073 }
1074
1075 if due.skipped > 0 {
1076 info!(
1077 "Flow {}: {} due scheduled times, {} skipped (catch-up)",
1078 flow_id_str,
1079 due.scheduled_times_secs.len(),
1080 due.skipped
1081 );
1082 }
1083
1084 for scheduled_time_secs in &due.scheduled_times_secs {
1086 if self.is_shutdown_signaled() {
1087 break;
1088 }
1089
1090 METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
1091 .with_label_values(&[&flow_id_str])
1092 .inc();
1093
1094 let outcome = self
1095 .execute_once_serialized_at_scheduled_time(
1096 &engine,
1097 &frontend_client,
1098 *scheduled_time_secs,
1099 )
1100 .await;
1101
1102 cursor_secs = *scheduled_time_secs;
1104
1105 match outcome.result {
1106 Ok(Some((rows, elapsed))) => {
1107 debug!(
1108 "Flow {}: scheduled time {} completed, rows={}, elapsed={:?}",
1109 flow_id_str, scheduled_time_secs, rows, elapsed
1110 );
1111 }
1112 Ok(None) => {
1113 debug!(
1114 "Flow {}: scheduled time {} produced no query (no dirty signal or no-op)",
1115 flow_id_str, scheduled_time_secs
1116 );
1117 }
1118 Err(err) => {
1119 warn!(
1120 "Flow {}: scheduled time {} failed: {:?}",
1121 flow_id_str, scheduled_time_secs, err
1122 );
1123 METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
1124 .with_label_values(&[&flow_id_str])
1125 .inc();
1126 }
1130 }
1131 }
1132 }
1133 }
1134
1135 async fn start_adaptive_loop(
1137 &self,
1138 engine: QueryEngineRef,
1139 frontend_client: Arc<FrontendClient>,
1140 ) {
1141 let flow_id_str = self.config.flow_id.to_string();
1142 let mut max_window_cnt = None;
1143 loop {
1144 if self.is_shutdown_signaled() {
1145 break;
1146 }
1147 METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
1148 .with_label_values(&[&flow_id_str])
1149 .inc();
1150
1151 let min_refresh = self.config.batch_opts.experimental_min_refresh_duration;
1152
1153 let outcome = self
1154 .execute_once_serialized_with_outcome(&engine, &frontend_client, max_window_cnt)
1155 .await;
1156
1157 match outcome.result {
1158 Ok(Some(_)) => {
1159 max_window_cnt = max_window_cnt.map(|cnt| {
1160 (cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query)
1161 });
1162
1163 let sleep_until = {
1164 let state = self.state.write().unwrap();
1165
1166 let time_window_size = self
1167 .config
1168 .time_window_expr
1169 .as_ref()
1170 .and_then(|t| *t.time_window_size());
1171
1172 let prefer_short_incremental_cadence = state.checkpoint_mode()
1173 == CheckpointMode::Incremental
1174 && !state.is_incremental_disabled();
1175
1176 state.get_next_start_query_time(
1177 self.config.flow_id,
1178 &time_window_size,
1179 min_refresh,
1180 Some(self.config.batch_opts.query_timeout),
1181 self.config.batch_opts.experimental_max_filter_num_per_query,
1182 prefer_short_incremental_cadence,
1183 )
1184 };
1185
1186 tokio::time::sleep_until(sleep_until).await;
1187 }
1188 Ok(None) => {
1189 debug!(
1190 "Flow id = {:?} found no new data, sleep for {:?} then continue",
1191 self.config.flow_id, min_refresh
1192 );
1193 tokio::time::sleep(min_refresh).await;
1194 continue;
1195 }
1196 Err(err) => {
1197 METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
1198 .with_label_values(&[&flow_id_str])
1199 .inc();
1200 match outcome.new_query {
1201 Some(query) => {
1202 common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
1203 max_window_cnt = Some(1);
1204 }
1205 None => {
1206 common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
1207 }
1208 }
1209 tokio::time::sleep(min_refresh).await;
1210 }
1211 }
1212 }
1213 }
1214
1215 fn is_shutdown_signaled(&self) -> bool {
1217 let mut state = self.state.write().unwrap();
1218 match state.shutdown_rx.try_recv() {
1219 Ok(()) | Err(TryRecvError::Closed) => true,
1220 Err(TryRecvError::Empty) => false,
1221 }
1222 }
1223
1224 async fn execute_once_serialized_at_scheduled_time(
1231 &self,
1232 engine: &QueryEngineRef,
1233 frontend_client: &Arc<FrontendClient>,
1234 scheduled_time_secs: i64,
1235 ) -> ExecuteOnceOutcome {
1236 let _execution_guard = self.execution_lock.lock().await;
1237
1238 struct QueryContextRestoreGuard {
1239 state: Arc<RwLock<TaskState>>,
1240 old_ctx: Option<QueryContextRef>,
1241 }
1242
1243 impl Drop for QueryContextRestoreGuard {
1244 fn drop(&mut self) {
1245 let Some(old_ctx) = self.old_ctx.take() else {
1246 return;
1247 };
1248 if let Ok(mut state) = self.state.write() {
1249 state.query_ctx = old_ctx;
1250 }
1251 }
1252 }
1253
1254 let old_ctx = {
1257 let mut state = self.state.write().unwrap();
1258 let old = state.query_ctx.clone();
1259 let mut new_ctx = (*old).clone();
1260 new_ctx.set_extension(
1261 query::options::FLOW_SCHEDULED_TIME_MILLIS,
1262 (scheduled_time_secs.saturating_mul(1000)).to_string(),
1263 );
1264 state.query_ctx = Arc::new(new_ctx);
1265 old
1266 };
1267 let restore_guard = QueryContextRestoreGuard {
1268 state: self.state.clone(),
1269 old_ctx: Some(old_ctx),
1270 };
1271
1272 let outcome = self
1273 .execute_once_unlocked(engine, frontend_client, None)
1274 .await;
1275
1276 drop(restore_guard);
1280
1281 outcome
1282 }
1283
1284 async fn gen_create_table_expr(
1291 &self,
1292 engine: QueryEngineRef,
1293 ) -> Result<CreateTableExpr, Error> {
1294 let query_ctx = self.state.read().unwrap().query_ctx.clone();
1295 let plan =
1296 sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
1297 create_table_with_expr(&plan, &self.config.sink_table_name, &self.config.query_type)
1298 }
1299
1300 fn should_use_unfiltered_incremental_delta(&self) -> bool {
1303 let state = self.state.read().unwrap();
1304 state.checkpoint_mode() == CheckpointMode::Incremental
1305 && !state.is_incremental_disabled()
1306 && matches!(self.config.query_type, QueryType::Sql)
1307 }
1308
1309 async fn gen_query_with_time_window(
1312 &self,
1313 engine: QueryEngineRef,
1314 sink_table_schema: &Arc<Schema>,
1315 primary_key_indices: &[usize],
1316 allow_partial: bool,
1317 max_window_cnt: Option<usize>,
1318 ) -> Result<Option<PlanInfo>, Error> {
1319 let query_ctx = self.state.read().unwrap().query_ctx.clone();
1320 let start = SystemTime::now();
1321 let since_the_epoch = start
1322 .duration_since(UNIX_EPOCH)
1323 .expect("Time went backwards");
1324 let low_bound = self
1325 .config
1326 .expire_after
1327 .map(|e| since_the_epoch.as_secs() - e as u64)
1328 .unwrap_or(u64::MIN);
1329
1330 let low_bound = Timestamp::new_second(low_bound as i64);
1331
1332 let expire_time_window_bound = self
1333 .config
1334 .time_window_expr
1335 .as_ref()
1336 .map(|expr| expr.eval(low_bound))
1337 .transpose()?;
1338
1339 let (expire_lower_bound, expire_upper_bound) = match (
1340 expire_time_window_bound,
1341 &self.config.query_type,
1342 ) {
1343 (Some((Some(l), Some(u))), QueryType::Sql) => (l, u),
1344 (None, QueryType::Sql) if self.config.flow_eval_interval.is_none() => {
1345 return UnexpectedSnafu {
1346 reason: format!(
1347 "Flow id={} reached execution without a time-window expression or EVAL INTERVAL; create-flow validation should have rejected it",
1348 self.config.flow_id
1349 ),
1350 }
1351 .fail();
1352 }
1353 _ => {
1354 let (_, dirty_windows_to_restore) = self.drain_dirty_windows_signal();
1360
1361 let plan_info = self
1362 .gen_unfiltered_plan_info(
1363 engine,
1364 query_ctx,
1365 sink_table_schema.clone(),
1366 primary_key_indices,
1367 allow_partial,
1368 dirty_windows_to_restore,
1369 None,
1370 QueryCoverage::UnfilteredFull,
1371 )
1372 .await?;
1373
1374 return Ok(Some(plan_info));
1375 }
1376 };
1377
1378 debug!(
1379 "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
1380 self.config.flow_id,
1381 expire_lower_bound,
1382 expire_upper_bound,
1383 self.state.read().unwrap().dirty_time_windows
1384 );
1385 let window_size = expire_upper_bound
1386 .sub(&expire_lower_bound)
1387 .with_context(|| UnexpectedSnafu {
1388 reason: format!(
1389 "Can't get window size from {expire_upper_bound:?} - {expire_lower_bound:?}"
1390 ),
1391 })?;
1392 let col_name = self
1393 .config
1394 .time_window_expr
1395 .as_ref()
1396 .map(|expr| expr.column_name.clone())
1397 .with_context(|| UnexpectedSnafu {
1398 reason: format!(
1399 "Flow id={:?}, Failed to get column name from time window expr",
1400 self.config.flow_id
1401 ),
1402 })?;
1403
1404 if self.should_use_unfiltered_incremental_delta() {
1405 let retention_filter = self
1415 .config
1416 .expire_after
1417 .map(|_| (col_name.as_str(), expire_lower_bound, "incremental"));
1418 return self
1419 .gen_unfiltered_plan_info_if_dirty(
1420 engine,
1421 query_ctx,
1422 sink_table_schema.clone(),
1423 primary_key_indices,
1424 allow_partial,
1425 retention_filter,
1426 QueryCoverage::IncrementalDelta,
1427 )
1428 .await;
1429 }
1430
1431 let (expr, coverage) = {
1432 let mut state = self.state.write().unwrap();
1433 let window_cnt = max_window_cnt
1434 .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query);
1435 let expr = state.gen_scoped_filter_exprs(
1436 &col_name,
1437 Some(expire_lower_bound),
1438 window_size,
1439 window_cnt,
1440 self.config.flow_id,
1441 Some(self),
1442 )?;
1443 let repair_high = state
1444 .pending_fenced_repair()
1445 .map(|repair| repair.high().clone());
1446 let coverage = if let Some(high) = repair_high {
1447 QueryCoverage::FencedRepairChunk { high }
1448 } else {
1449 QueryCoverage::ScopedBaseRepair
1450 };
1451 (expr, coverage)
1452 };
1453
1454 let Some(expr) = expr else {
1455 debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
1457 return Ok(None);
1458 };
1459
1460 let filter_sql = expr_to_sql(&expr.expr)
1461 .map(|sql| sql.to_string())
1462 .unwrap_or_else(|err| format!("<failed to format filter expr: {err}>"));
1463
1464 debug!(
1465 "Flow id={:?}, Generated filter expr: {:?}",
1466 self.config.flow_id, filter_sql
1467 );
1468
1469 let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
1470 let mut add_auto_column = ColumnMatcherRewriter::new(
1471 sink_table_schema.clone(),
1472 primary_key_indices.to_vec(),
1473 allow_partial,
1474 );
1475
1476 let plan = self.restore_scoped_dirty_windows_on_err(
1477 &expr,
1478 sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await,
1479 )?;
1480 let rewrite = self.restore_scoped_dirty_windows_on_err(
1481 &expr,
1482 plan.clone()
1483 .rewrite(&mut add_filter)
1484 .and_then(|p| p.data.rewrite(&mut add_auto_column))
1485 .with_context(|_| DatafusionSnafu {
1486 context: format!("Failed to rewrite plan:\n {}\n", plan),
1487 })
1488 .map(|rewrite| rewrite.data),
1489 )?;
1490 let new_plan = self.restore_scoped_dirty_windows_on_err(
1492 &expr,
1493 apply_df_optimizer(rewrite, &query_ctx).await,
1494 )?;
1495
1496 let info = PlanInfo {
1497 plan: new_plan.clone(),
1498 dirty_restore: DirtyRestore::Scoped(expr),
1499 coverage,
1500 };
1501
1502 Ok(Some(info))
1503 }
1504}
1505
1506#[cfg(test)]
1507mod test;