1use std::collections::{BTreeMap, HashMap, HashSet};
18use std::sync::Arc;
19use std::time::Duration;
20
21use api::v1::flow::DirtyWindowRequests;
22use catalog::CatalogManagerRef;
23use common_error::ext::BoxedError;
24use common_meta::ddl::create_flow::FlowType;
25use common_meta::key::TableMetadataManagerRef;
26use common_meta::key::flow::FlowMetadataManagerRef;
27use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
28use common_runtime::JoinHandle;
29use common_telemetry::tracing::warn;
30use common_telemetry::{debug, info};
31use common_time::TimeToLive;
32use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
33use datafusion_expr::LogicalPlan;
34use datatypes::prelude::ConcreteDataType;
35use query::QueryEngineRef;
36use session::context::QueryContext;
37use snafu::{OptionExt, ResultExt, ensure};
38use sql::parsers::utils::is_tql;
39use store_api::metric_engine_consts::is_metric_engine_internal_column;
40use store_api::storage::{RegionId, TableId};
41use table::table_reference::TableReference;
42use tokio::sync::{RwLock, oneshot};
43
44use crate::batching_mode::BatchingModeOptions;
45use crate::batching_mode::frontend_client::FrontendClient;
46use crate::batching_mode::task::{BatchingTask, TaskArgs};
47use crate::batching_mode::time_window::{TimeWindowExpr, find_time_window_expr};
48use crate::batching_mode::utils::sql_to_df_plan;
49use crate::engine::FlowEngine;
50use crate::error::{
51 CreateFlowSnafu, DatafusionSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu,
52 InvalidQuerySnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu,
53};
54use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
55use crate::{CreateFlowArgs, Error, FlowId, TableName};
56
57pub struct BatchingEngine {
61 tasks: RwLock<BTreeMap<FlowId, BatchingTask>>,
62 shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
63 pub(crate) frontend_client: Arc<FrontendClient>,
65 flow_metadata_manager: FlowMetadataManagerRef,
66 table_meta: TableMetadataManagerRef,
67 catalog_manager: CatalogManagerRef,
68 query_engine: QueryEngineRef,
69 pub(crate) batch_opts: Arc<BatchingModeOptions>,
72}
73
74impl BatchingEngine {
75 pub fn new(
76 frontend_client: Arc<FrontendClient>,
77 query_engine: QueryEngineRef,
78 flow_metadata_manager: FlowMetadataManagerRef,
79 table_meta: TableMetadataManagerRef,
80 catalog_manager: CatalogManagerRef,
81 batch_opts: BatchingModeOptions,
82 ) -> Self {
83 Self {
84 tasks: Default::default(),
85 shutdown_txs: Default::default(),
86 frontend_client,
87 flow_metadata_manager,
88 table_meta,
89 catalog_manager,
90 query_engine,
91 batch_opts: Arc::new(batch_opts),
92 }
93 }
94
95 pub async fn handle_mark_dirty_time_window(
96 &self,
97 reqs: DirtyWindowRequests,
98 ) -> Result<(), Error> {
99 let table_info_mgr = self.table_meta.table_info_manager();
100
101 let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
102 for r in reqs.requests {
103 let tid = TableId::from(r.table_id);
104 let entry = group_by_table_id.entry(tid).or_default();
105 entry.extend(r.timestamps);
106 }
107 let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
108 let table_infos =
109 table_info_mgr
110 .batch_get(&tids)
111 .await
112 .with_context(|_| TableNotFoundMetaSnafu {
113 msg: format!("Failed to get table info for table ids: {:?}", tids),
114 })?;
115
116 let group_by_table_name = group_by_table_id
117 .into_iter()
118 .filter_map(|(id, timestamps)| {
119 let table_name = table_infos.get(&id).map(|info| info.table_name());
120 let Some(table_name) = table_name else {
121 warn!("Failed to get table infos for table id: {:?}", id);
122 return None;
123 };
124 let table_name = [
125 table_name.catalog_name,
126 table_name.schema_name,
127 table_name.table_name,
128 ];
129 let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
130 let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()]
131 .data_type
132 .as_timestamp()
133 .unwrap()
134 .unit();
135 Some((table_name, (timestamps, time_index_unit)))
136 })
137 .collect::<HashMap<_, _>>();
138
139 let group_by_table_name = Arc::new(group_by_table_name);
140
141 let mut handles = Vec::new();
142 let tasks = self.tasks.read().await;
143
144 for (_flow_id, task) in tasks.iter() {
145 let src_table_names = &task.config.source_table_names;
146
147 if src_table_names
148 .iter()
149 .all(|name| !group_by_table_name.contains_key(name))
150 {
151 continue;
152 }
153
154 let group_by_table_name = group_by_table_name.clone();
155 let task = task.clone();
156
157 let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
158 let src_table_names = &task.config.source_table_names;
159 let mut all_dirty_windows = HashSet::new();
160 let mut is_dirty = false;
161 for src_table_name in src_table_names {
162 if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
163 let Some(expr) = &task.config.time_window_expr else {
164 is_dirty = true;
165 continue;
166 };
167 for timestamp in timestamps {
168 let align_start = expr
169 .eval(common_time::Timestamp::new(*timestamp, *unit))?
170 .0
171 .context(UnexpectedSnafu {
172 reason: "Failed to eval start value",
173 })?;
174 all_dirty_windows.insert(align_start);
175 }
176 }
177 }
178 let mut state = task.state.write().unwrap();
179 if is_dirty {
180 state.dirty_time_windows.set_dirty();
181 }
182 let flow_id_label = task.config.flow_id.to_string();
183 for timestamp in all_dirty_windows {
184 state.dirty_time_windows.add_window(timestamp, None);
185 }
186
187 METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW
188 .with_label_values(&[&flow_id_label])
189 .set(state.dirty_time_windows.len() as f64);
190 Ok(())
191 });
192 handles.push(handle);
193 }
194 drop(tasks);
195 for handle in handles {
196 match handle.await {
197 Err(e) => {
198 warn!("Failed to handle inserts: {e}");
199 }
200 Ok(Ok(())) => (),
201 Ok(Err(e)) => {
202 warn!("Failed to handle inserts: {e}");
203 }
204 }
205 }
206
207 Ok(())
208 }
209
210 pub async fn handle_inserts_inner(
211 &self,
212 request: api::v1::region::InsertRequests,
213 ) -> Result<(), Error> {
214 let table_info_mgr = self.table_meta.table_info_manager();
215 let mut group_by_table_id: HashMap<TableId, Vec<api::v1::Rows>> = HashMap::new();
216
217 for r in request.requests {
218 let tid = RegionId::from(r.region_id).table_id();
219 let entry = group_by_table_id.entry(tid).or_default();
220 if let Some(rows) = r.rows {
221 entry.push(rows);
222 }
223 }
224
225 let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
226 let table_infos =
227 table_info_mgr
228 .batch_get(&tids)
229 .await
230 .with_context(|_| TableNotFoundMetaSnafu {
231 msg: format!("Failed to get table info for table ids: {:?}", tids),
232 })?;
233
234 let missing_tids = tids
235 .iter()
236 .filter(|id| !table_infos.contains_key(id))
237 .collect::<Vec<_>>();
238 if !missing_tids.is_empty() {
239 warn!(
240 "Failed to get all the table info for table ids, expected table ids: {:?}, those table doesn't exist: {:?}",
241 tids, missing_tids
242 );
243 }
244
245 let group_by_table_name = group_by_table_id
246 .into_iter()
247 .filter_map(|(id, rows)| {
248 let table_name = table_infos.get(&id).map(|info| info.table_name());
249 let Some(table_name) = table_name else {
250 warn!("Failed to get table infos for table id: {:?}", id);
251 return None;
252 };
253 let table_name = [
254 table_name.catalog_name,
255 table_name.schema_name,
256 table_name.table_name,
257 ];
258 Some((table_name, rows))
259 })
260 .collect::<HashMap<_, _>>();
261
262 let group_by_table_name = Arc::new(group_by_table_name);
263
264 let mut handles = Vec::new();
265 let tasks = self.tasks.read().await;
266 for (_flow_id, task) in tasks.iter() {
267 let src_table_names = &task.config.source_table_names;
268
269 if src_table_names
270 .iter()
271 .all(|name| !group_by_table_name.contains_key(name))
272 {
273 continue;
274 }
275
276 let group_by_table_name = group_by_table_name.clone();
277 let task = task.clone();
278
279 let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
280 let src_table_names = &task.config.source_table_names;
281
282 let mut is_dirty = false;
283
284 for src_table_name in src_table_names {
285 if let Some(entry) = group_by_table_name.get(src_table_name) {
286 let Some(expr) = &task.config.time_window_expr else {
287 is_dirty = true;
288 continue;
289 };
290 let involved_time_windows = expr.handle_rows(entry.clone()).await?;
291 let mut state = task.state.write().unwrap();
292 state
293 .dirty_time_windows
294 .add_lower_bounds(involved_time_windows.into_iter());
295 }
296 }
297 if is_dirty {
298 task.state.write().unwrap().dirty_time_windows.set_dirty();
299 }
300
301 Ok(())
302 });
303 handles.push(handle);
304 }
305
306 for handle in handles {
307 match handle.await {
308 Err(e) => {
309 warn!("Failed to handle inserts: {e}");
310 }
311 Ok(Ok(())) => (),
312 Ok(Err(e)) => {
313 warn!("Failed to handle inserts: {e}");
314 }
315 }
316 }
317 drop(tasks);
318
319 Ok(())
320 }
321}
322
323async fn get_table_name(
324 table_info: &TableInfoManager,
325 table_id: &TableId,
326) -> Result<TableName, Error> {
327 get_table_info(table_info, table_id).await.map(|info| {
328 let name = info.table_name();
329 [name.catalog_name, name.schema_name, name.table_name]
330 })
331}
332
333async fn get_table_info(
334 table_info: &TableInfoManager,
335 table_id: &TableId,
336) -> Result<TableInfoValue, Error> {
337 table_info
338 .get(*table_id)
339 .await
340 .map_err(BoxedError::new)
341 .context(ExternalSnafu)?
342 .with_context(|| UnexpectedSnafu {
343 reason: format!("Table id = {:?}, couldn't found table name", table_id),
344 })
345 .map(|info| info.into_inner())
346}
347
348impl BatchingEngine {
349 pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
350 let CreateFlowArgs {
351 flow_id,
352 sink_table_name,
353 source_table_ids,
354 create_if_not_exists,
355 or_replace,
356 expire_after,
357 eval_interval,
358 comment: _,
359 sql,
360 flow_options,
361 query_ctx,
362 } = args;
363
364 {
366 let is_exist = self.tasks.read().await.contains_key(&flow_id);
367 match (create_if_not_exists, or_replace, is_exist) {
368 (_, true, true) => {
370 info!("Replacing flow with id={}", flow_id);
371 }
372 (false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
373 (true, false, true) => {
375 info!("Flow with id={} already exists, do nothing", flow_id);
376 return Ok(None);
377 }
378
379 (_, _, false) => (),
381 }
382 }
383
384 let query_ctx = query_ctx.context({
385 UnexpectedSnafu {
386 reason: "Query context is None".to_string(),
387 }
388 })?;
389 let query_ctx = Arc::new(query_ctx);
390 let is_tql = is_tql(query_ctx.sql_dialect(), &sql)
391 .map_err(BoxedError::new)
392 .context(CreateFlowSnafu { sql: &sql })?;
393
394 if eval_interval.is_none() && is_tql {
396 InvalidQuerySnafu {
397 reason: "TQL query requires EVAL INTERVAL to be set".to_string(),
398 }
399 .fail()?;
400 }
401
402 let flow_type = flow_options.get(FlowType::FLOW_TYPE_KEY);
403
404 ensure!(
405 match flow_type {
406 None => true,
407 Some(ty) if ty == FlowType::BATCHING => true,
408 _ => false,
409 },
410 UnexpectedSnafu {
411 reason: format!("Flow type is not batching nor None, got {flow_type:?}")
412 }
413 );
414
415 let mut source_table_names = Vec::with_capacity(2);
416 for src_id in source_table_ids {
417 let table_name = get_table_name(self.table_meta.table_info_manager(), &src_id).await?;
419 let table_info = get_table_info(self.table_meta.table_info_manager(), &src_id).await?;
420 ensure!(
421 table_info.table_info.meta.options.ttl != Some(TimeToLive::Instant),
422 UnsupportedSnafu {
423 reason: format!(
424 "Source table `{}`(id={}) has instant TTL, Instant TTL is not supported under batching mode. Consider using a TTL longer than flush interval",
425 table_name.join("."),
426 src_id
427 ),
428 }
429 );
430
431 source_table_names.push(table_name);
432 }
433
434 let (tx, rx) = oneshot::channel();
435
436 let plan = sql_to_df_plan(query_ctx.clone(), self.query_engine.clone(), &sql, true).await?;
437
438 if is_tql {
439 self.check_is_tql_table(&plan, &query_ctx).await?;
440 }
441
442 let phy_expr = if !is_tql {
443 let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
444 &plan,
445 self.query_engine.engine_state().catalog_manager().clone(),
446 query_ctx.clone(),
447 )
448 .await?;
449 time_window_expr
450 .map(|expr| {
451 TimeWindowExpr::from_expr(
452 &expr,
453 &column_name,
454 &df_schema,
455 &self.query_engine.engine_state().session_state(),
456 )
457 })
458 .transpose()?
459 } else {
460 None
462 };
463
464 debug!(
465 "Flow id={}, found time window expr={}",
466 flow_id,
467 phy_expr
468 .as_ref()
469 .map(|phy_expr| phy_expr.to_string())
470 .unwrap_or("None".to_string())
471 );
472
473 let task_args = TaskArgs {
474 flow_id,
475 query: &sql,
476 plan,
477 time_window_expr: phy_expr,
478 expire_after,
479 sink_table_name,
480 source_table_names,
481 query_ctx,
482 catalog_manager: self.catalog_manager.clone(),
483 shutdown_rx: rx,
484 batch_opts: self.batch_opts.clone(),
485 flow_eval_interval: eval_interval.map(|secs| Duration::from_secs(secs as u64)),
486 };
487
488 let task = BatchingTask::try_new(task_args)?;
489
490 let task_inner = task.clone();
491 let engine = self.query_engine.clone();
492 let frontend = self.frontend_client.clone();
493
494 task.check_or_create_sink_table(&engine, &frontend).await?;
496
497 let handle = common_runtime::spawn_global(async move {
499 task_inner.start_executing_loop(engine, frontend).await;
500 });
501 task.state.write().unwrap().task_handle = Some(handle);
502
503 let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task);
505 drop(replaced_old_task_opt);
506
507 self.shutdown_txs.write().await.insert(flow_id, tx);
508
509 Ok(Some(flow_id))
510 }
511
512 async fn check_is_tql_table(
513 &self,
514 query: &LogicalPlan,
515 query_ctx: &QueryContext,
516 ) -> Result<(), Error> {
517 struct CollectTableRef {
518 table_refs: HashSet<datafusion_common::TableReference>,
519 }
520
521 impl TreeNodeVisitor<'_> for CollectTableRef {
522 type Node = LogicalPlan;
523 fn f_down(
524 &mut self,
525 node: &Self::Node,
526 ) -> datafusion_common::Result<TreeNodeRecursion> {
527 if let LogicalPlan::TableScan(scan) = node {
528 self.table_refs.insert(scan.table_name.clone());
529 }
530 Ok(TreeNodeRecursion::Continue)
531 }
532 }
533 let mut table_refs = CollectTableRef {
534 table_refs: HashSet::new(),
535 };
536 query
537 .visit_with_subqueries(&mut table_refs)
538 .context(DatafusionSnafu {
539 context: "Checking if all source tables are TQL tables",
540 })?;
541
542 let default_catalog = query_ctx.current_catalog();
543 let default_schema = query_ctx.current_schema();
544 let default_schema = &default_schema;
545
546 for table_ref in table_refs.table_refs {
547 let table_ref = match &table_ref {
548 datafusion_common::TableReference::Bare { table } => {
549 TableReference::full(default_catalog, default_schema, table)
550 }
551 datafusion_common::TableReference::Partial { schema, table } => {
552 TableReference::full(default_catalog, schema, table)
553 }
554 datafusion_common::TableReference::Full {
555 catalog,
556 schema,
557 table,
558 } => TableReference::full(catalog, schema, table),
559 };
560
561 let table_id = self
562 .table_meta
563 .table_name_manager()
564 .get(table_ref.into())
565 .await
566 .map_err(BoxedError::new)
567 .context(ExternalSnafu)?
568 .with_context(|| UnexpectedSnafu {
569 reason: format!("Failed to get table id for table: {}", table_ref),
570 })?
571 .table_id();
572 let table_info =
573 get_table_info(self.table_meta.table_info_manager(), &table_id).await?;
574 let value_cols = table_info
576 .table_info
577 .meta
578 .schema
579 .column_schemas
580 .iter()
581 .filter(|col| col.data_type == ConcreteDataType::float64_datatype())
582 .collect::<Vec<_>>();
583 ensure!(
584 value_cols.len() == 1,
585 InvalidQuerySnafu {
586 reason: format!(
587 "TQL query only supports one f64 value column, table `{}`(id={}) has {} f64 value columns, columns are: {:?}",
588 table_ref,
589 table_id,
590 value_cols.len(),
591 value_cols
592 ),
593 }
594 );
595 let pk_idxs = table_info
597 .table_info
598 .meta
599 .primary_key_indices
600 .iter()
601 .collect::<HashSet<_>>();
602
603 for (idx, col) in table_info
604 .table_info
605 .meta
606 .schema
607 .column_schemas
608 .iter()
609 .enumerate()
610 {
611 if is_metric_engine_internal_column(&col.name) {
612 continue;
613 }
614 let is_pk: bool = pk_idxs.contains(&&idx);
620
621 ensure!(
622 col.data_type == ConcreteDataType::float64_datatype()
623 || col.data_type.is_timestamp()
624 || (col.data_type == ConcreteDataType::string_datatype() && is_pk),
625 InvalidQuerySnafu {
626 reason: format!(
627 "TQL query only supports f64 value column, timestamp column and string tag columns, table `{}`(id={}) has column `{}` with type {:?} which is not supported",
628 table_ref, table_id, col.name, col.data_type
629 ),
630 }
631 );
632 }
633 }
634 Ok(())
635 }
636
637 pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
638 if self.tasks.write().await.remove(&flow_id).is_none() {
639 warn!("Flow {flow_id} not found in tasks");
640 FlowNotFoundSnafu { id: flow_id }.fail()?;
641 }
642 let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else {
643 UnexpectedSnafu {
644 reason: format!("Can't found shutdown tx for flow {flow_id}"),
645 }
646 .fail()?
647 };
648 if tx.send(()).is_err() {
649 warn!(
650 "Fail to shutdown flow {flow_id} due to receiver already dropped, maybe flow {flow_id} is already dropped?"
651 )
652 }
653 Ok(())
654 }
655
656 pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
659 debug!("Try flush flow {flow_id}");
660 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
664 let task = self.tasks.read().await.get(&flow_id).cloned();
665 let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
666
667 let time_window_size = task
668 .config
669 .time_window_expr
670 .as_ref()
671 .and_then(|expr| *expr.time_window_size());
672
673 let cur_dirty_window_cnt = time_window_size.map(|time_window_size| {
674 task.state
675 .read()
676 .unwrap()
677 .dirty_time_windows
678 .effective_count(&time_window_size)
679 });
680
681 let res = task
682 .gen_exec_once(
683 &self.query_engine,
684 &self.frontend_client,
685 cur_dirty_window_cnt,
686 )
687 .await?;
688
689 let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize;
690 debug!(
691 "Successfully flush flow {flow_id}, affected rows={}",
692 affected_rows
693 );
694 Ok(affected_rows)
695 }
696
697 pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
699 self.tasks.read().await.contains_key(&flow_id)
700 }
701}
702
703impl FlowEngine for BatchingEngine {
704 async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
705 self.create_flow_inner(args).await
706 }
707 async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
708 self.remove_flow_inner(flow_id).await
709 }
710 async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
711 self.flush_flow_inner(flow_id).await
712 }
713 async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
714 Ok(self.flow_exist_inner(flow_id).await)
715 }
716 async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
717 Ok(self.tasks.read().await.keys().cloned().collect::<Vec<_>>())
718 }
719 async fn handle_flow_inserts(
720 &self,
721 request: api::v1::region::InsertRequests,
722 ) -> Result<(), Error> {
723 self.handle_inserts_inner(request).await
724 }
725 async fn handle_mark_window_dirty(
726 &self,
727 req: api::v1::flow::DirtyWindowRequests,
728 ) -> Result<(), Error> {
729 self.handle_mark_dirty_time_window(req).await
730 }
731}