1use std::collections::{BTreeMap, HashMap, HashSet};
18use std::sync::Arc;
19use std::time::Duration;
20
21use api::v1::flow::DirtyWindowRequests;
22use catalog::CatalogManagerRef;
23use common_error::ext::BoxedError;
24use common_meta::ddl::create_flow::FlowType;
25use common_meta::key::TableMetadataManagerRef;
26use common_meta::key::flow::FlowMetadataManagerRef;
27use common_meta::key::flow::flow_state::FlowStat;
28use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
29use common_runtime::JoinHandle;
30use common_telemetry::tracing::warn;
31use common_telemetry::{debug, info};
32use common_time::TimeToLive;
33use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
34use datafusion_expr::LogicalPlan;
35use datatypes::prelude::ConcreteDataType;
36use query::QueryEngineRef;
37use session::context::QueryContext;
38use snafu::{OptionExt, ResultExt, ensure};
39use sql::parsers::utils::is_tql;
40use store_api::metric_engine_consts::is_metric_engine_internal_column;
41use store_api::storage::{RegionId, TableId};
42use table::table_reference::TableReference;
43use tokio::sync::{RwLock, oneshot};
44
45use crate::batching_mode::BatchingModeOptions;
46use crate::batching_mode::frontend_client::FrontendClient;
47use crate::batching_mode::task::{BatchingTask, TaskArgs};
48use crate::batching_mode::time_window::{TimeWindowExpr, find_time_window_expr};
49use crate::batching_mode::utils::sql_to_df_plan;
50use crate::engine::{FlowEngine, FlowStatProvider};
51use crate::error::{
52 CreateFlowSnafu, DatafusionSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu,
53 InvalidQuerySnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu,
54};
55use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
56use crate::{CreateFlowArgs, Error, FlowId, TableName};
57
58pub struct BatchingEngine {
62 tasks: RwLock<BTreeMap<FlowId, BatchingTask>>,
63 shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
64 pub(crate) frontend_client: Arc<FrontendClient>,
66 flow_metadata_manager: FlowMetadataManagerRef,
67 table_meta: TableMetadataManagerRef,
68 catalog_manager: CatalogManagerRef,
69 query_engine: QueryEngineRef,
70 pub(crate) batch_opts: Arc<BatchingModeOptions>,
73}
74
75impl BatchingEngine {
76 pub fn new(
77 frontend_client: Arc<FrontendClient>,
78 query_engine: QueryEngineRef,
79 flow_metadata_manager: FlowMetadataManagerRef,
80 table_meta: TableMetadataManagerRef,
81 catalog_manager: CatalogManagerRef,
82 batch_opts: BatchingModeOptions,
83 ) -> Self {
84 Self {
85 tasks: Default::default(),
86 shutdown_txs: Default::default(),
87 frontend_client,
88 flow_metadata_manager,
89 table_meta,
90 catalog_manager,
91 query_engine,
92 batch_opts: Arc::new(batch_opts),
93 }
94 }
95
96 pub async fn get_last_exec_time_map(&self) -> BTreeMap<FlowId, i64> {
98 let tasks = self.tasks.read().await;
99 tasks
100 .iter()
101 .filter_map(|(flow_id, task)| {
102 task.last_execution_time_millis()
103 .map(|timestamp| (*flow_id, timestamp))
104 })
105 .collect()
106 }
107
108 pub async fn handle_mark_dirty_time_window(
109 &self,
110 reqs: DirtyWindowRequests,
111 ) -> Result<(), Error> {
112 let table_info_mgr = self.table_meta.table_info_manager();
113
114 let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
115 for r in reqs.requests {
116 let tid = TableId::from(r.table_id);
117 let entry = group_by_table_id.entry(tid).or_default();
118 entry.extend(r.timestamps);
119 }
120 let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
121 let table_infos =
122 table_info_mgr
123 .batch_get(&tids)
124 .await
125 .with_context(|_| TableNotFoundMetaSnafu {
126 msg: format!("Failed to get table info for table ids: {:?}", tids),
127 })?;
128
129 let group_by_table_name = group_by_table_id
130 .into_iter()
131 .filter_map(|(id, timestamps)| {
132 let table_name = table_infos.get(&id).map(|info| info.table_name());
133 let Some(table_name) = table_name else {
134 warn!("Failed to get table infos for table id: {:?}", id);
135 return None;
136 };
137 let table_name = [
138 table_name.catalog_name,
139 table_name.schema_name,
140 table_name.table_name,
141 ];
142 let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
143 let time_index_unit = schema.column_schemas()[schema.timestamp_index().unwrap()]
144 .data_type
145 .as_timestamp()
146 .unwrap()
147 .unit();
148 Some((table_name, (timestamps, time_index_unit)))
149 })
150 .collect::<HashMap<_, _>>();
151
152 let group_by_table_name = Arc::new(group_by_table_name);
153
154 let mut handles = Vec::new();
155 let tasks = self.tasks.read().await;
156
157 for (_flow_id, task) in tasks.iter() {
158 let src_table_names = &task.config.source_table_names;
159
160 if src_table_names
161 .iter()
162 .all(|name| !group_by_table_name.contains_key(name))
163 {
164 continue;
165 }
166
167 let group_by_table_name = group_by_table_name.clone();
168 let task = task.clone();
169
170 let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
171 let src_table_names = &task.config.source_table_names;
172 let mut all_dirty_windows = HashSet::new();
173 let mut is_dirty = false;
174 for src_table_name in src_table_names {
175 if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
176 let Some(expr) = &task.config.time_window_expr else {
177 is_dirty = true;
178 continue;
179 };
180 for timestamp in timestamps {
181 let align_start = expr
182 .eval(common_time::Timestamp::new(*timestamp, *unit))?
183 .0
184 .context(UnexpectedSnafu {
185 reason: "Failed to eval start value",
186 })?;
187 all_dirty_windows.insert(align_start);
188 }
189 }
190 }
191 let mut state = task.state.write().unwrap();
192 if is_dirty {
193 state.dirty_time_windows.set_dirty();
194 }
195 let flow_id_label = task.config.flow_id.to_string();
196 for timestamp in all_dirty_windows {
197 state.dirty_time_windows.add_window(timestamp, None);
198 }
199
200 METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW
201 .with_label_values(&[&flow_id_label])
202 .set(state.dirty_time_windows.len() as f64);
203 Ok(())
204 });
205 handles.push(handle);
206 }
207 drop(tasks);
208 for handle in handles {
209 match handle.await {
210 Err(e) => {
211 warn!("Failed to handle inserts: {e}");
212 }
213 Ok(Ok(())) => (),
214 Ok(Err(e)) => {
215 warn!("Failed to handle inserts: {e}");
216 }
217 }
218 }
219
220 Ok(())
221 }
222
223 pub async fn handle_inserts_inner(
224 &self,
225 request: api::v1::region::InsertRequests,
226 ) -> Result<(), Error> {
227 let table_info_mgr = self.table_meta.table_info_manager();
228 let mut group_by_table_id: HashMap<TableId, Vec<api::v1::Rows>> = HashMap::new();
229
230 for r in request.requests {
231 let tid = RegionId::from(r.region_id).table_id();
232 let entry = group_by_table_id.entry(tid).or_default();
233 if let Some(rows) = r.rows {
234 entry.push(rows);
235 }
236 }
237
238 let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
239 let table_infos =
240 table_info_mgr
241 .batch_get(&tids)
242 .await
243 .with_context(|_| TableNotFoundMetaSnafu {
244 msg: format!("Failed to get table info for table ids: {:?}", tids),
245 })?;
246
247 let missing_tids = tids
248 .iter()
249 .filter(|id| !table_infos.contains_key(id))
250 .collect::<Vec<_>>();
251 if !missing_tids.is_empty() {
252 warn!(
253 "Failed to get all the table info for table ids, expected table ids: {:?}, those table doesn't exist: {:?}",
254 tids, missing_tids
255 );
256 }
257
258 let group_by_table_name = group_by_table_id
259 .into_iter()
260 .filter_map(|(id, rows)| {
261 let table_name = table_infos.get(&id).map(|info| info.table_name());
262 let Some(table_name) = table_name else {
263 warn!("Failed to get table infos for table id: {:?}", id);
264 return None;
265 };
266 let table_name = [
267 table_name.catalog_name,
268 table_name.schema_name,
269 table_name.table_name,
270 ];
271 Some((table_name, rows))
272 })
273 .collect::<HashMap<_, _>>();
274
275 let group_by_table_name = Arc::new(group_by_table_name);
276
277 let mut handles = Vec::new();
278 let tasks = self.tasks.read().await;
279 for (_flow_id, task) in tasks.iter() {
280 let src_table_names = &task.config.source_table_names;
281
282 if src_table_names
283 .iter()
284 .all(|name| !group_by_table_name.contains_key(name))
285 {
286 continue;
287 }
288
289 let group_by_table_name = group_by_table_name.clone();
290 let task = task.clone();
291
292 let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
293 let src_table_names = &task.config.source_table_names;
294
295 let mut is_dirty = false;
296
297 for src_table_name in src_table_names {
298 if let Some(entry) = group_by_table_name.get(src_table_name) {
299 let Some(expr) = &task.config.time_window_expr else {
300 is_dirty = true;
301 continue;
302 };
303 let involved_time_windows = expr.handle_rows(entry.clone()).await?;
304 let mut state = task.state.write().unwrap();
305 state
306 .dirty_time_windows
307 .add_lower_bounds(involved_time_windows.into_iter());
308 }
309 }
310 if is_dirty {
311 task.state.write().unwrap().dirty_time_windows.set_dirty();
312 }
313
314 Ok(())
315 });
316 handles.push(handle);
317 }
318
319 for handle in handles {
320 match handle.await {
321 Err(e) => {
322 warn!("Failed to handle inserts: {e}");
323 }
324 Ok(Ok(())) => (),
325 Ok(Err(e)) => {
326 warn!("Failed to handle inserts: {e}");
327 }
328 }
329 }
330 drop(tasks);
331
332 Ok(())
333 }
334}
335
336impl FlowStatProvider for BatchingEngine {
337 async fn flow_stat(&self) -> FlowStat {
338 FlowStat {
339 state_size: BTreeMap::new(),
340 last_exec_time_map: self
341 .get_last_exec_time_map()
342 .await
343 .into_iter()
344 .map(|(flow_id, timestamp)| (flow_id as u32, timestamp))
345 .collect(),
346 }
347 }
348}
349
350async fn get_table_name(
351 table_info: &TableInfoManager,
352 table_id: &TableId,
353) -> Result<TableName, Error> {
354 get_table_info(table_info, table_id).await.map(|info| {
355 let name = info.table_name();
356 [name.catalog_name, name.schema_name, name.table_name]
357 })
358}
359
360async fn get_table_info(
361 table_info: &TableInfoManager,
362 table_id: &TableId,
363) -> Result<TableInfoValue, Error> {
364 table_info
365 .get(*table_id)
366 .await
367 .map_err(BoxedError::new)
368 .context(ExternalSnafu)?
369 .with_context(|| UnexpectedSnafu {
370 reason: format!("Table id = {:?}, couldn't found table name", table_id),
371 })
372 .map(|info| info.into_inner())
373}
374
375impl BatchingEngine {
376 pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
377 let CreateFlowArgs {
378 flow_id,
379 sink_table_name,
380 source_table_ids,
381 create_if_not_exists,
382 or_replace,
383 expire_after,
384 eval_interval,
385 comment: _,
386 sql,
387 flow_options,
388 query_ctx,
389 } = args;
390
391 {
393 let is_exist = self.tasks.read().await.contains_key(&flow_id);
394 match (create_if_not_exists, or_replace, is_exist) {
395 (_, true, true) => {
397 info!("Replacing flow with id={}", flow_id);
398 }
399 (false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
400 (true, false, true) => {
402 info!("Flow with id={} already exists, do nothing", flow_id);
403 return Ok(None);
404 }
405
406 (_, _, false) => (),
408 }
409 }
410
411 let query_ctx = query_ctx.context({
412 UnexpectedSnafu {
413 reason: "Query context is None".to_string(),
414 }
415 })?;
416 let query_ctx = Arc::new(query_ctx);
417 let is_tql = is_tql(query_ctx.sql_dialect(), &sql)
418 .map_err(BoxedError::new)
419 .context(CreateFlowSnafu { sql: &sql })?;
420
421 if eval_interval.is_none() && is_tql {
423 InvalidQuerySnafu {
424 reason: "TQL query requires EVAL INTERVAL to be set".to_string(),
425 }
426 .fail()?;
427 }
428
429 let flow_type = flow_options.get(FlowType::FLOW_TYPE_KEY);
430
431 ensure!(
432 match flow_type {
433 None => true,
434 Some(ty) if ty == FlowType::BATCHING => true,
435 _ => false,
436 },
437 UnexpectedSnafu {
438 reason: format!("Flow type is not batching nor None, got {flow_type:?}")
439 }
440 );
441
442 let mut source_table_names = Vec::with_capacity(2);
443 for src_id in source_table_ids {
444 let table_name = get_table_name(self.table_meta.table_info_manager(), &src_id).await?;
446 let table_info = get_table_info(self.table_meta.table_info_manager(), &src_id).await?;
447 ensure!(
448 table_info.table_info.meta.options.ttl != Some(TimeToLive::Instant),
449 UnsupportedSnafu {
450 reason: format!(
451 "Source table `{}`(id={}) has instant TTL, Instant TTL is not supported under batching mode. Consider using a TTL longer than flush interval",
452 table_name.join("."),
453 src_id
454 ),
455 }
456 );
457
458 source_table_names.push(table_name);
459 }
460
461 let (tx, rx) = oneshot::channel();
462
463 let plan = sql_to_df_plan(query_ctx.clone(), self.query_engine.clone(), &sql, true).await?;
464
465 if is_tql {
466 self.check_is_tql_table(&plan, &query_ctx).await?;
467 }
468
469 let phy_expr = if !is_tql {
470 let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
471 &plan,
472 self.query_engine.engine_state().catalog_manager().clone(),
473 query_ctx.clone(),
474 )
475 .await?;
476 time_window_expr
477 .map(|expr| {
478 TimeWindowExpr::from_expr(
479 &expr,
480 &column_name,
481 &df_schema,
482 &self.query_engine.engine_state().session_state(),
483 )
484 })
485 .transpose()?
486 } else {
487 None
489 };
490
491 debug!(
492 "Flow id={}, found time window expr={}",
493 flow_id,
494 phy_expr
495 .as_ref()
496 .map(|phy_expr| phy_expr.to_string())
497 .unwrap_or("None".to_string())
498 );
499
500 let task_args = TaskArgs {
501 flow_id,
502 query: &sql,
503 plan,
504 time_window_expr: phy_expr,
505 expire_after,
506 sink_table_name,
507 source_table_names,
508 query_ctx,
509 catalog_manager: self.catalog_manager.clone(),
510 shutdown_rx: rx,
511 batch_opts: self.batch_opts.clone(),
512 flow_eval_interval: eval_interval.map(|secs| Duration::from_secs(secs as u64)),
513 };
514
515 let task = BatchingTask::try_new(task_args)?;
516
517 let task_inner = task.clone();
518 let engine = self.query_engine.clone();
519 let frontend = self.frontend_client.clone();
520
521 task.check_or_create_sink_table(&engine, &frontend).await?;
523
524 let handle = common_runtime::spawn_global(async move {
526 task_inner.start_executing_loop(engine, frontend).await;
527 });
528 task.state.write().unwrap().task_handle = Some(handle);
529
530 let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task);
532 drop(replaced_old_task_opt);
533
534 self.shutdown_txs.write().await.insert(flow_id, tx);
535
536 Ok(Some(flow_id))
537 }
538
539 async fn check_is_tql_table(
540 &self,
541 query: &LogicalPlan,
542 query_ctx: &QueryContext,
543 ) -> Result<(), Error> {
544 struct CollectTableRef {
545 table_refs: HashSet<datafusion_common::TableReference>,
546 }
547
548 impl TreeNodeVisitor<'_> for CollectTableRef {
549 type Node = LogicalPlan;
550 fn f_down(
551 &mut self,
552 node: &Self::Node,
553 ) -> datafusion_common::Result<TreeNodeRecursion> {
554 if let LogicalPlan::TableScan(scan) = node {
555 self.table_refs.insert(scan.table_name.clone());
556 }
557 Ok(TreeNodeRecursion::Continue)
558 }
559 }
560 let mut table_refs = CollectTableRef {
561 table_refs: HashSet::new(),
562 };
563 query
564 .visit_with_subqueries(&mut table_refs)
565 .context(DatafusionSnafu {
566 context: "Checking if all source tables are TQL tables",
567 })?;
568
569 let default_catalog = query_ctx.current_catalog();
570 let default_schema = query_ctx.current_schema();
571 let default_schema = &default_schema;
572
573 for table_ref in table_refs.table_refs {
574 let table_ref = match &table_ref {
575 datafusion_common::TableReference::Bare { table } => {
576 TableReference::full(default_catalog, default_schema, table)
577 }
578 datafusion_common::TableReference::Partial { schema, table } => {
579 TableReference::full(default_catalog, schema, table)
580 }
581 datafusion_common::TableReference::Full {
582 catalog,
583 schema,
584 table,
585 } => TableReference::full(catalog, schema, table),
586 };
587
588 let table_id = self
589 .table_meta
590 .table_name_manager()
591 .get(table_ref.into())
592 .await
593 .map_err(BoxedError::new)
594 .context(ExternalSnafu)?
595 .with_context(|| UnexpectedSnafu {
596 reason: format!("Failed to get table id for table: {}", table_ref),
597 })?
598 .table_id();
599 let table_info =
600 get_table_info(self.table_meta.table_info_manager(), &table_id).await?;
601 let value_cols = table_info
603 .table_info
604 .meta
605 .schema
606 .column_schemas()
607 .iter()
608 .filter(|col| col.data_type == ConcreteDataType::float64_datatype())
609 .collect::<Vec<_>>();
610 ensure!(
611 value_cols.len() == 1,
612 InvalidQuerySnafu {
613 reason: format!(
614 "TQL query only supports one f64 value column, table `{}`(id={}) has {} f64 value columns, columns are: {:?}",
615 table_ref,
616 table_id,
617 value_cols.len(),
618 value_cols
619 ),
620 }
621 );
622 let pk_idxs = table_info
624 .table_info
625 .meta
626 .primary_key_indices
627 .iter()
628 .collect::<HashSet<_>>();
629
630 for (idx, col) in table_info
631 .table_info
632 .meta
633 .schema
634 .column_schemas()
635 .iter()
636 .enumerate()
637 {
638 if is_metric_engine_internal_column(&col.name) {
639 continue;
640 }
641 let is_pk: bool = pk_idxs.contains(&&idx);
647
648 ensure!(
649 col.data_type == ConcreteDataType::float64_datatype()
650 || col.data_type.is_timestamp()
651 || (col.data_type == ConcreteDataType::string_datatype() && is_pk),
652 InvalidQuerySnafu {
653 reason: format!(
654 "TQL query only supports f64 value column, timestamp column and string tag columns, table `{}`(id={}) has column `{}` with type {:?} which is not supported",
655 table_ref, table_id, col.name, col.data_type
656 ),
657 }
658 );
659 }
660 }
661 Ok(())
662 }
663
664 pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
665 if self.tasks.write().await.remove(&flow_id).is_none() {
666 warn!("Flow {flow_id} not found in tasks");
667 FlowNotFoundSnafu { id: flow_id }.fail()?;
668 }
669 let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else {
670 UnexpectedSnafu {
671 reason: format!("Can't found shutdown tx for flow {flow_id}"),
672 }
673 .fail()?
674 };
675 if tx.send(()).is_err() {
676 warn!(
677 "Fail to shutdown flow {flow_id} due to receiver already dropped, maybe flow {flow_id} is already dropped?"
678 )
679 }
680 Ok(())
681 }
682
683 pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
686 debug!("Try flush flow {flow_id}");
687 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
691 let task = self.tasks.read().await.get(&flow_id).cloned();
692 let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
693
694 let time_window_size = task
695 .config
696 .time_window_expr
697 .as_ref()
698 .and_then(|expr| *expr.time_window_size());
699
700 let cur_dirty_window_cnt = time_window_size.map(|time_window_size| {
701 task.state
702 .read()
703 .unwrap()
704 .dirty_time_windows
705 .effective_count(&time_window_size)
706 });
707
708 let res = task
709 .gen_exec_once(
710 &self.query_engine,
711 &self.frontend_client,
712 cur_dirty_window_cnt,
713 )
714 .await?;
715
716 let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize;
717 debug!(
718 "Successfully flush flow {flow_id}, affected rows={}",
719 affected_rows
720 );
721 Ok(affected_rows)
722 }
723
724 pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
726 self.tasks.read().await.contains_key(&flow_id)
727 }
728}
729
730impl FlowEngine for BatchingEngine {
731 async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
732 self.create_flow_inner(args).await
733 }
734 async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
735 self.remove_flow_inner(flow_id).await
736 }
737 async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
738 self.flush_flow_inner(flow_id).await
739 }
740 async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
741 Ok(self.flow_exist_inner(flow_id).await)
742 }
743 async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
744 Ok(self.tasks.read().await.keys().cloned().collect::<Vec<_>>())
745 }
746 async fn handle_flow_inserts(
747 &self,
748 request: api::v1::region::InsertRequests,
749 ) -> Result<(), Error> {
750 self.handle_inserts_inner(request).await
751 }
752 async fn handle_mark_window_dirty(
753 &self,
754 req: api::v1::flow::DirtyWindowRequests,
755 ) -> Result<(), Error> {
756 self.handle_mark_dirty_time_window(req).await
757 }
758}