1use std::collections::{BTreeMap, HashMap, HashSet};
18use std::sync::Arc;
19use std::time::Duration;
20
21use api::v1::flow::DirtyWindowRequests;
22use catalog::CatalogManagerRef;
23use common_error::ext::BoxedError;
24use common_meta::ddl::create_flow::FlowType;
25use common_meta::key::TableMetadataManagerRef;
26use common_meta::key::flow::FlowMetadataManagerRef;
27use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
28use common_runtime::JoinHandle;
29use common_telemetry::tracing::warn;
30use common_telemetry::{debug, info};
31use common_time::TimeToLive;
32use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
33use datafusion_expr::LogicalPlan;
34use datatypes::prelude::ConcreteDataType;
35use query::QueryEngineRef;
36use session::context::QueryContext;
37use snafu::{OptionExt, ResultExt, ensure};
38use sql::parsers::utils::is_tql;
39use store_api::storage::{RegionId, TableId};
40use table::table_reference::TableReference;
41use tokio::sync::{RwLock, oneshot};
42
43use crate::batching_mode::BatchingModeOptions;
44use crate::batching_mode::frontend_client::FrontendClient;
45use crate::batching_mode::task::{BatchingTask, TaskArgs};
46use crate::batching_mode::time_window::{TimeWindowExpr, find_time_window_expr};
47use crate::batching_mode::utils::sql_to_df_plan;
48use crate::engine::FlowEngine;
49use crate::error::{
50 CreateFlowSnafu, DatafusionSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu,
51 InvalidQuerySnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu,
52};
53use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
54use crate::{CreateFlowArgs, Error, FlowId, TableName};
55
56pub struct BatchingEngine {
60 tasks: RwLock<BTreeMap<FlowId, BatchingTask>>,
61 shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
62 pub(crate) frontend_client: Arc<FrontendClient>,
64 flow_metadata_manager: FlowMetadataManagerRef,
65 table_meta: TableMetadataManagerRef,
66 catalog_manager: CatalogManagerRef,
67 query_engine: QueryEngineRef,
68 pub(crate) batch_opts: Arc<BatchingModeOptions>,
71}
72
73impl BatchingEngine {
74 pub fn new(
75 frontend_client: Arc<FrontendClient>,
76 query_engine: QueryEngineRef,
77 flow_metadata_manager: FlowMetadataManagerRef,
78 table_meta: TableMetadataManagerRef,
79 catalog_manager: CatalogManagerRef,
80 batch_opts: BatchingModeOptions,
81 ) -> Self {
82 Self {
83 tasks: Default::default(),
84 shutdown_txs: Default::default(),
85 frontend_client,
86 flow_metadata_manager,
87 table_meta,
88 catalog_manager,
89 query_engine,
90 batch_opts: Arc::new(batch_opts),
91 }
92 }
93
94 pub async fn handle_mark_dirty_time_window(
95 &self,
96 reqs: DirtyWindowRequests,
97 ) -> Result<(), Error> {
98 let table_info_mgr = self.table_meta.table_info_manager();
99
100 let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
101 for r in reqs.requests {
102 let tid = TableId::from(r.table_id);
103 let entry = group_by_table_id.entry(tid).or_default();
104 entry.extend(r.timestamps);
105 }
106 let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
107 let table_infos =
108 table_info_mgr
109 .batch_get(&tids)
110 .await
111 .with_context(|_| TableNotFoundMetaSnafu {
112 msg: format!("Failed to get table info for table ids: {:?}", tids),
113 })?;
114
115 let group_by_table_name = group_by_table_id
116 .into_iter()
117 .filter_map(|(id, timestamps)| {
118 let table_name = table_infos.get(&id).map(|info| info.table_name());
119 let Some(table_name) = table_name else {
120 warn!("Failed to get table infos for table id: {:?}", id);
121 return None;
122 };
123 let table_name = [
124 table_name.catalog_name,
125 table_name.schema_name,
126 table_name.table_name,
127 ];
128 let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
129 let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()]
130 .data_type
131 .as_timestamp()
132 .unwrap()
133 .unit();
134 Some((table_name, (timestamps, time_index_unit)))
135 })
136 .collect::<HashMap<_, _>>();
137
138 let group_by_table_name = Arc::new(group_by_table_name);
139
140 let mut handles = Vec::new();
141 let tasks = self.tasks.read().await;
142
143 for (_flow_id, task) in tasks.iter() {
144 let src_table_names = &task.config.source_table_names;
145
146 if src_table_names
147 .iter()
148 .all(|name| !group_by_table_name.contains_key(name))
149 {
150 continue;
151 }
152
153 let group_by_table_name = group_by_table_name.clone();
154 let task = task.clone();
155
156 let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
157 let src_table_names = &task.config.source_table_names;
158 let mut all_dirty_windows = HashSet::new();
159 let mut is_dirty = false;
160 for src_table_name in src_table_names {
161 if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
162 let Some(expr) = &task.config.time_window_expr else {
163 is_dirty = true;
164 continue;
165 };
166 for timestamp in timestamps {
167 let align_start = expr
168 .eval(common_time::Timestamp::new(*timestamp, *unit))?
169 .0
170 .context(UnexpectedSnafu {
171 reason: "Failed to eval start value",
172 })?;
173 all_dirty_windows.insert(align_start);
174 }
175 }
176 }
177 let mut state = task.state.write().unwrap();
178 if is_dirty {
179 state.dirty_time_windows.set_dirty();
180 }
181 let flow_id_label = task.config.flow_id.to_string();
182 for timestamp in all_dirty_windows {
183 state.dirty_time_windows.add_window(timestamp, None);
184 }
185
186 METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW
187 .with_label_values(&[&flow_id_label])
188 .set(state.dirty_time_windows.len() as f64);
189 Ok(())
190 });
191 handles.push(handle);
192 }
193 drop(tasks);
194 for handle in handles {
195 match handle.await {
196 Err(e) => {
197 warn!("Failed to handle inserts: {e}");
198 }
199 Ok(Ok(())) => (),
200 Ok(Err(e)) => {
201 warn!("Failed to handle inserts: {e}");
202 }
203 }
204 }
205
206 Ok(())
207 }
208
209 pub async fn handle_inserts_inner(
210 &self,
211 request: api::v1::region::InsertRequests,
212 ) -> Result<(), Error> {
213 let table_info_mgr = self.table_meta.table_info_manager();
214 let mut group_by_table_id: HashMap<TableId, Vec<api::v1::Rows>> = HashMap::new();
215
216 for r in request.requests {
217 let tid = RegionId::from(r.region_id).table_id();
218 let entry = group_by_table_id.entry(tid).or_default();
219 if let Some(rows) = r.rows {
220 entry.push(rows);
221 }
222 }
223
224 let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
225 let table_infos =
226 table_info_mgr
227 .batch_get(&tids)
228 .await
229 .with_context(|_| TableNotFoundMetaSnafu {
230 msg: format!("Failed to get table info for table ids: {:?}", tids),
231 })?;
232
233 let missing_tids = tids
234 .iter()
235 .filter(|id| !table_infos.contains_key(id))
236 .collect::<Vec<_>>();
237 if !missing_tids.is_empty() {
238 warn!(
239 "Failed to get all the table info for table ids, expected table ids: {:?}, those table doesn't exist: {:?}",
240 tids, missing_tids
241 );
242 }
243
244 let group_by_table_name = group_by_table_id
245 .into_iter()
246 .filter_map(|(id, rows)| {
247 let table_name = table_infos.get(&id).map(|info| info.table_name());
248 let Some(table_name) = table_name else {
249 warn!("Failed to get table infos for table id: {:?}", id);
250 return None;
251 };
252 let table_name = [
253 table_name.catalog_name,
254 table_name.schema_name,
255 table_name.table_name,
256 ];
257 Some((table_name, rows))
258 })
259 .collect::<HashMap<_, _>>();
260
261 let group_by_table_name = Arc::new(group_by_table_name);
262
263 let mut handles = Vec::new();
264 let tasks = self.tasks.read().await;
265 for (_flow_id, task) in tasks.iter() {
266 let src_table_names = &task.config.source_table_names;
267
268 if src_table_names
269 .iter()
270 .all(|name| !group_by_table_name.contains_key(name))
271 {
272 continue;
273 }
274
275 let group_by_table_name = group_by_table_name.clone();
276 let task = task.clone();
277
278 let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
279 let src_table_names = &task.config.source_table_names;
280
281 let mut is_dirty = false;
282
283 for src_table_name in src_table_names {
284 if let Some(entry) = group_by_table_name.get(src_table_name) {
285 let Some(expr) = &task.config.time_window_expr else {
286 is_dirty = true;
287 continue;
288 };
289 let involved_time_windows = expr.handle_rows(entry.clone()).await?;
290 let mut state = task.state.write().unwrap();
291 state
292 .dirty_time_windows
293 .add_lower_bounds(involved_time_windows.into_iter());
294 }
295 }
296 if is_dirty {
297 task.state.write().unwrap().dirty_time_windows.set_dirty();
298 }
299
300 Ok(())
301 });
302 handles.push(handle);
303 }
304
305 for handle in handles {
306 match handle.await {
307 Err(e) => {
308 warn!("Failed to handle inserts: {e}");
309 }
310 Ok(Ok(())) => (),
311 Ok(Err(e)) => {
312 warn!("Failed to handle inserts: {e}");
313 }
314 }
315 }
316 drop(tasks);
317
318 Ok(())
319 }
320}
321
322async fn get_table_name(
323 table_info: &TableInfoManager,
324 table_id: &TableId,
325) -> Result<TableName, Error> {
326 get_table_info(table_info, table_id).await.map(|info| {
327 let name = info.table_name();
328 [name.catalog_name, name.schema_name, name.table_name]
329 })
330}
331
332async fn get_table_info(
333 table_info: &TableInfoManager,
334 table_id: &TableId,
335) -> Result<TableInfoValue, Error> {
336 table_info
337 .get(*table_id)
338 .await
339 .map_err(BoxedError::new)
340 .context(ExternalSnafu)?
341 .with_context(|| UnexpectedSnafu {
342 reason: format!("Table id = {:?}, couldn't found table name", table_id),
343 })
344 .map(|info| info.into_inner())
345}
346
347impl BatchingEngine {
348 pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
349 let CreateFlowArgs {
350 flow_id,
351 sink_table_name,
352 source_table_ids,
353 create_if_not_exists,
354 or_replace,
355 expire_after,
356 eval_interval,
357 comment: _,
358 sql,
359 flow_options,
360 query_ctx,
361 } = args;
362
363 {
365 let is_exist = self.tasks.read().await.contains_key(&flow_id);
366 match (create_if_not_exists, or_replace, is_exist) {
367 (_, true, true) => {
369 info!("Replacing flow with id={}", flow_id);
370 }
371 (false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
372 (true, false, true) => {
374 info!("Flow with id={} already exists, do nothing", flow_id);
375 return Ok(None);
376 }
377
378 (_, _, false) => (),
380 }
381 }
382
383 let query_ctx = query_ctx.context({
384 UnexpectedSnafu {
385 reason: "Query context is None".to_string(),
386 }
387 })?;
388 let query_ctx = Arc::new(query_ctx);
389 let is_tql = is_tql(query_ctx.sql_dialect(), &sql)
390 .map_err(BoxedError::new)
391 .context(CreateFlowSnafu { sql: &sql })?;
392
393 if eval_interval.is_none() && is_tql {
395 InvalidQuerySnafu {
396 reason: "TQL query requires EVAL INTERVAL to be set".to_string(),
397 }
398 .fail()?;
399 }
400
401 let flow_type = flow_options.get(FlowType::FLOW_TYPE_KEY);
402
403 ensure!(
404 match flow_type {
405 None => true,
406 Some(ty) if ty == FlowType::BATCHING => true,
407 _ => false,
408 },
409 UnexpectedSnafu {
410 reason: format!("Flow type is not batching nor None, got {flow_type:?}")
411 }
412 );
413
414 let mut source_table_names = Vec::with_capacity(2);
415 for src_id in source_table_ids {
416 let table_name = get_table_name(self.table_meta.table_info_manager(), &src_id).await?;
418 let table_info = get_table_info(self.table_meta.table_info_manager(), &src_id).await?;
419 ensure!(
420 table_info.table_info.meta.options.ttl != Some(TimeToLive::Instant),
421 UnsupportedSnafu {
422 reason: format!(
423 "Source table `{}`(id={}) has instant TTL, Instant TTL is not supported under batching mode. Consider using a TTL longer than flush interval",
424 table_name.join("."),
425 src_id
426 ),
427 }
428 );
429
430 source_table_names.push(table_name);
431 }
432
433 let (tx, rx) = oneshot::channel();
434
435 let plan = sql_to_df_plan(query_ctx.clone(), self.query_engine.clone(), &sql, true).await?;
436
437 if is_tql {
438 self.check_is_tql_table(&plan, &query_ctx).await?;
439 }
440
441 let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
442 &plan,
443 self.query_engine.engine_state().catalog_manager().clone(),
444 query_ctx.clone(),
445 )
446 .await?;
447
448 let phy_expr = time_window_expr
449 .map(|expr| {
450 TimeWindowExpr::from_expr(
451 &expr,
452 &column_name,
453 &df_schema,
454 &self.query_engine.engine_state().session_state(),
455 )
456 })
457 .transpose()?;
458
459 debug!(
460 "Flow id={}, found time window expr={}",
461 flow_id,
462 phy_expr
463 .as_ref()
464 .map(|phy_expr| phy_expr.to_string())
465 .unwrap_or("None".to_string())
466 );
467
468 let task_args = TaskArgs {
469 flow_id,
470 query: &sql,
471 plan,
472 time_window_expr: phy_expr,
473 expire_after,
474 sink_table_name,
475 source_table_names,
476 query_ctx,
477 catalog_manager: self.catalog_manager.clone(),
478 shutdown_rx: rx,
479 batch_opts: self.batch_opts.clone(),
480 flow_eval_interval: eval_interval.map(|secs| Duration::from_secs(secs as u64)),
481 };
482
483 let task = BatchingTask::try_new(task_args)?;
484
485 let task_inner = task.clone();
486 let engine = self.query_engine.clone();
487 let frontend = self.frontend_client.clone();
488
489 task.check_or_create_sink_table(&engine, &frontend).await?;
491
492 let handle = common_runtime::spawn_global(async move {
494 task_inner.start_executing_loop(engine, frontend).await;
495 });
496 task.state.write().unwrap().task_handle = Some(handle);
497
498 let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task);
500 drop(replaced_old_task_opt);
501
502 self.shutdown_txs.write().await.insert(flow_id, tx);
503
504 Ok(Some(flow_id))
505 }
506
507 async fn check_is_tql_table(
508 &self,
509 query: &LogicalPlan,
510 query_ctx: &QueryContext,
511 ) -> Result<(), Error> {
512 struct CollectTableRef {
513 table_refs: HashSet<datafusion_common::TableReference>,
514 }
515
516 impl TreeNodeVisitor<'_> for CollectTableRef {
517 type Node = LogicalPlan;
518 fn f_down(
519 &mut self,
520 node: &Self::Node,
521 ) -> datafusion_common::Result<TreeNodeRecursion> {
522 if let LogicalPlan::TableScan(scan) = node {
523 self.table_refs.insert(scan.table_name.clone());
524 }
525 Ok(TreeNodeRecursion::Continue)
526 }
527 }
528 let mut table_refs = CollectTableRef {
529 table_refs: HashSet::new(),
530 };
531 query
532 .visit_with_subqueries(&mut table_refs)
533 .context(DatafusionSnafu {
534 context: "Checking if all source tables are TQL tables",
535 })?;
536
537 let default_catalog = query_ctx.current_catalog();
538 let default_schema = query_ctx.current_schema();
539 let default_schema = &default_schema;
540
541 for table_ref in table_refs.table_refs {
542 let table_ref = match &table_ref {
543 datafusion_common::TableReference::Bare { table } => {
544 TableReference::full(default_catalog, default_schema, table)
545 }
546 datafusion_common::TableReference::Partial { schema, table } => {
547 TableReference::full(default_catalog, schema, table)
548 }
549 datafusion_common::TableReference::Full {
550 catalog,
551 schema,
552 table,
553 } => TableReference::full(catalog, schema, table),
554 };
555
556 let table_id = self
557 .table_meta
558 .table_name_manager()
559 .get(table_ref.into())
560 .await
561 .map_err(BoxedError::new)
562 .context(ExternalSnafu)?
563 .with_context(|| UnexpectedSnafu {
564 reason: format!("Failed to get table id for table: {}", table_ref),
565 })?
566 .table_id();
567 let table_info =
568 get_table_info(self.table_meta.table_info_manager(), &table_id).await?;
569 let value_cols = table_info
571 .table_info
572 .meta
573 .schema
574 .column_schemas
575 .iter()
576 .filter(|col| col.data_type == ConcreteDataType::float64_datatype())
577 .collect::<Vec<_>>();
578 ensure!(
579 value_cols.len() == 1,
580 InvalidQuerySnafu {
581 reason: format!(
582 "TQL query only supports one f64 value column, table `{}`(id={}) has {} f64 value columns, columns are: {:?}",
583 table_ref,
584 table_id,
585 value_cols.len(),
586 value_cols
587 ),
588 }
589 );
590 let pk_idxs = table_info
592 .table_info
593 .meta
594 .primary_key_indices
595 .iter()
596 .collect::<HashSet<_>>();
597
598 for (idx, col) in table_info
599 .table_info
600 .meta
601 .schema
602 .column_schemas
603 .iter()
604 .enumerate()
605 {
606 let is_pk: bool = pk_idxs.contains(&&idx);
612
613 ensure!(
614 col.data_type == ConcreteDataType::float64_datatype()
615 || col.data_type.is_timestamp()
616 || (col.data_type == ConcreteDataType::string_datatype() && is_pk),
617 InvalidQuerySnafu {
618 reason: format!(
619 "TQL query only supports f64 value column, timestamp column and string tag columns, table `{}`(id={}) has column `{}` with type {:?} which is not supported",
620 table_ref, table_id, col.name, col.data_type
621 ),
622 }
623 );
624 }
625 }
626 Ok(())
627 }
628
629 pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
630 if self.tasks.write().await.remove(&flow_id).is_none() {
631 warn!("Flow {flow_id} not found in tasks");
632 FlowNotFoundSnafu { id: flow_id }.fail()?;
633 }
634 let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else {
635 UnexpectedSnafu {
636 reason: format!("Can't found shutdown tx for flow {flow_id}"),
637 }
638 .fail()?
639 };
640 if tx.send(()).is_err() {
641 warn!(
642 "Fail to shutdown flow {flow_id} due to receiver already dropped, maybe flow {flow_id} is already dropped?"
643 )
644 }
645 Ok(())
646 }
647
648 pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
651 debug!("Try flush flow {flow_id}");
652 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
656 let task = self.tasks.read().await.get(&flow_id).cloned();
657 let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
658
659 let time_window_size = task
660 .config
661 .time_window_expr
662 .as_ref()
663 .and_then(|expr| *expr.time_window_size());
664
665 let cur_dirty_window_cnt = time_window_size.map(|time_window_size| {
666 task.state
667 .read()
668 .unwrap()
669 .dirty_time_windows
670 .effective_count(&time_window_size)
671 });
672
673 let res = task
674 .gen_exec_once(
675 &self.query_engine,
676 &self.frontend_client,
677 cur_dirty_window_cnt,
678 )
679 .await?;
680
681 let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize;
682 debug!(
683 "Successfully flush flow {flow_id}, affected rows={}",
684 affected_rows
685 );
686 Ok(affected_rows)
687 }
688
689 pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
691 self.tasks.read().await.contains_key(&flow_id)
692 }
693}
694
695impl FlowEngine for BatchingEngine {
696 async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
697 self.create_flow_inner(args).await
698 }
699 async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
700 self.remove_flow_inner(flow_id).await
701 }
702 async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
703 self.flush_flow_inner(flow_id).await
704 }
705 async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
706 Ok(self.flow_exist_inner(flow_id).await)
707 }
708 async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
709 Ok(self.tasks.read().await.keys().cloned().collect::<Vec<_>>())
710 }
711 async fn handle_flow_inserts(
712 &self,
713 request: api::v1::region::InsertRequests,
714 ) -> Result<(), Error> {
715 self.handle_inserts_inner(request).await
716 }
717 async fn handle_mark_window_dirty(
718 &self,
719 req: api::v1::flow::DirtyWindowRequests,
720 ) -> Result<(), Error> {
721 self.handle_mark_dirty_time_window(req).await
722 }
723}