1use std::collections::{BTreeMap, HashMap, HashSet};
18use std::sync::Arc;
19
20use api::v1::flow::{DirtyWindowRequests, FlowResponse};
21use catalog::CatalogManagerRef;
22use common_error::ext::BoxedError;
23use common_meta::ddl::create_flow::FlowType;
24use common_meta::key::flow::FlowMetadataManagerRef;
25use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
26use common_meta::key::TableMetadataManagerRef;
27use common_runtime::JoinHandle;
28use common_telemetry::tracing::warn;
29use common_telemetry::{debug, info};
30use common_time::TimeToLive;
31use query::QueryEngineRef;
32use snafu::{ensure, OptionExt, ResultExt};
33use store_api::storage::{RegionId, TableId};
34use tokio::sync::{oneshot, RwLock};
35
36use crate::batching_mode::frontend_client::FrontendClient;
37use crate::batching_mode::task::{BatchingTask, TaskArgs};
38use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr};
39use crate::batching_mode::utils::sql_to_df_plan;
40use crate::batching_mode::BatchingModeOptions;
41use crate::engine::FlowEngine;
42use crate::error::{
43 ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
44 UnexpectedSnafu, UnsupportedSnafu,
45};
46use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
47use crate::{CreateFlowArgs, Error, FlowId, TableName};
48
49pub struct BatchingEngine {
53 tasks: RwLock<BTreeMap<FlowId, BatchingTask>>,
54 shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
55 pub(crate) frontend_client: Arc<FrontendClient>,
57 flow_metadata_manager: FlowMetadataManagerRef,
58 table_meta: TableMetadataManagerRef,
59 catalog_manager: CatalogManagerRef,
60 query_engine: QueryEngineRef,
61 pub(crate) batch_opts: Arc<BatchingModeOptions>,
64}
65
66impl BatchingEngine {
67 pub fn new(
68 frontend_client: Arc<FrontendClient>,
69 query_engine: QueryEngineRef,
70 flow_metadata_manager: FlowMetadataManagerRef,
71 table_meta: TableMetadataManagerRef,
72 catalog_manager: CatalogManagerRef,
73 batch_opts: BatchingModeOptions,
74 ) -> Self {
75 Self {
76 tasks: Default::default(),
77 shutdown_txs: Default::default(),
78 frontend_client,
79 flow_metadata_manager,
80 table_meta,
81 catalog_manager,
82 query_engine,
83 batch_opts: Arc::new(batch_opts),
84 }
85 }
86
87 pub async fn handle_mark_dirty_time_window(
88 &self,
89 reqs: DirtyWindowRequests,
90 ) -> Result<FlowResponse, Error> {
91 let table_info_mgr = self.table_meta.table_info_manager();
92
93 let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
94 for r in reqs.requests {
95 let tid = TableId::from(r.table_id);
96 let entry = group_by_table_id.entry(tid).or_default();
97 entry.extend(r.timestamps);
98 }
99 let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
100 let table_infos =
101 table_info_mgr
102 .batch_get(&tids)
103 .await
104 .with_context(|_| TableNotFoundMetaSnafu {
105 msg: format!("Failed to get table info for table ids: {:?}", tids),
106 })?;
107
108 let group_by_table_name = group_by_table_id
109 .into_iter()
110 .filter_map(|(id, timestamps)| {
111 let table_name = table_infos.get(&id).map(|info| info.table_name());
112 let Some(table_name) = table_name else {
113 warn!("Failed to get table infos for table id: {:?}", id);
114 return None;
115 };
116 let table_name = [
117 table_name.catalog_name,
118 table_name.schema_name,
119 table_name.table_name,
120 ];
121 let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
122 let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()]
123 .data_type
124 .as_timestamp()
125 .unwrap()
126 .unit();
127 Some((table_name, (timestamps, time_index_unit)))
128 })
129 .collect::<HashMap<_, _>>();
130
131 let group_by_table_name = Arc::new(group_by_table_name);
132
133 let mut handles = Vec::new();
134 let tasks = self.tasks.read().await;
135
136 for (_flow_id, task) in tasks.iter() {
137 let src_table_names = &task.config.source_table_names;
138
139 if src_table_names
140 .iter()
141 .all(|name| !group_by_table_name.contains_key(name))
142 {
143 continue;
144 }
145
146 let group_by_table_name = group_by_table_name.clone();
147 let task = task.clone();
148
149 let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
150 let src_table_names = &task.config.source_table_names;
151 let mut all_dirty_windows = HashSet::new();
152 for src_table_name in src_table_names {
153 if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
154 let Some(expr) = &task.config.time_window_expr else {
155 continue;
156 };
157 for timestamp in timestamps {
158 let align_start = expr
159 .eval(common_time::Timestamp::new(*timestamp, *unit))?
160 .0
161 .context(UnexpectedSnafu {
162 reason: "Failed to eval start value",
163 })?;
164 all_dirty_windows.insert(align_start);
165 }
166 }
167 }
168 let mut state = task.state.write().unwrap();
169 let flow_id_label = task.config.flow_id.to_string();
170 for timestamp in all_dirty_windows {
171 state.dirty_time_windows.add_window(timestamp, None);
172 }
173
174 METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW
175 .with_label_values(&[&flow_id_label])
176 .set(state.dirty_time_windows.len() as f64);
177 Ok(())
178 });
179 handles.push(handle);
180 }
181 drop(tasks);
182 for handle in handles {
183 match handle.await {
184 Err(e) => {
185 warn!("Failed to handle inserts: {e}");
186 }
187 Ok(Ok(())) => (),
188 Ok(Err(e)) => {
189 warn!("Failed to handle inserts: {e}");
190 }
191 }
192 }
193
194 Ok(Default::default())
195 }
196
197 pub async fn handle_inserts_inner(
198 &self,
199 request: api::v1::region::InsertRequests,
200 ) -> Result<(), Error> {
201 let table_info_mgr = self.table_meta.table_info_manager();
202 let mut group_by_table_id: HashMap<TableId, Vec<api::v1::Rows>> = HashMap::new();
203
204 for r in request.requests {
205 let tid = RegionId::from(r.region_id).table_id();
206 let entry = group_by_table_id.entry(tid).or_default();
207 if let Some(rows) = r.rows {
208 entry.push(rows);
209 }
210 }
211
212 let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
213 let table_infos =
214 table_info_mgr
215 .batch_get(&tids)
216 .await
217 .with_context(|_| TableNotFoundMetaSnafu {
218 msg: format!("Failed to get table info for table ids: {:?}", tids),
219 })?;
220
221 let missing_tids = tids
222 .iter()
223 .filter(|id| !table_infos.contains_key(id))
224 .collect::<Vec<_>>();
225 if !missing_tids.is_empty() {
226 warn!(
227 "Failed to get all the table info for table ids, expected table ids: {:?}, those table doesn't exist: {:?}",
228 tids,
229 missing_tids
230 );
231 }
232
233 let group_by_table_name = group_by_table_id
234 .into_iter()
235 .filter_map(|(id, rows)| {
236 let table_name = table_infos.get(&id).map(|info| info.table_name());
237 let Some(table_name) = table_name else {
238 warn!("Failed to get table infos for table id: {:?}", id);
239 return None;
240 };
241 let table_name = [
242 table_name.catalog_name,
243 table_name.schema_name,
244 table_name.table_name,
245 ];
246 Some((table_name, rows))
247 })
248 .collect::<HashMap<_, _>>();
249
250 let group_by_table_name = Arc::new(group_by_table_name);
251
252 let mut handles = Vec::new();
253 let tasks = self.tasks.read().await;
254 for (_flow_id, task) in tasks.iter() {
255 let src_table_names = &task.config.source_table_names;
256
257 if src_table_names
258 .iter()
259 .all(|name| !group_by_table_name.contains_key(name))
260 {
261 continue;
262 }
263
264 let group_by_table_name = group_by_table_name.clone();
265 let task = task.clone();
266
267 let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
268 let src_table_names = &task.config.source_table_names;
269
270 for src_table_name in src_table_names {
271 if let Some(entry) = group_by_table_name.get(src_table_name) {
272 let Some(expr) = &task.config.time_window_expr else {
273 continue;
274 };
275 let involved_time_windows = expr.handle_rows(entry.clone()).await?;
276 let mut state = task.state.write().unwrap();
277 state
278 .dirty_time_windows
279 .add_lower_bounds(involved_time_windows.into_iter());
280 }
281 }
282 Ok(())
283 });
284 handles.push(handle);
285 }
286
287 for handle in handles {
288 match handle.await {
289 Err(e) => {
290 warn!("Failed to handle inserts: {e}");
291 }
292 Ok(Ok(())) => (),
293 Ok(Err(e)) => {
294 warn!("Failed to handle inserts: {e}");
295 }
296 }
297 }
298 drop(tasks);
299
300 Ok(())
301 }
302}
303
304async fn get_table_name(
305 table_info: &TableInfoManager,
306 table_id: &TableId,
307) -> Result<TableName, Error> {
308 get_table_info(table_info, table_id).await.map(|info| {
309 let name = info.table_name();
310 [name.catalog_name, name.schema_name, name.table_name]
311 })
312}
313
314async fn get_table_info(
315 table_info: &TableInfoManager,
316 table_id: &TableId,
317) -> Result<TableInfoValue, Error> {
318 table_info
319 .get(*table_id)
320 .await
321 .map_err(BoxedError::new)
322 .context(ExternalSnafu)?
323 .with_context(|| UnexpectedSnafu {
324 reason: format!("Table id = {:?}, couldn't found table name", table_id),
325 })
326 .map(|info| info.into_inner())
327}
328
329impl BatchingEngine {
330 pub async fn create_flow_inner(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
331 let CreateFlowArgs {
332 flow_id,
333 sink_table_name,
334 source_table_ids,
335 create_if_not_exists,
336 or_replace,
337 expire_after,
338 comment: _,
339 sql,
340 flow_options,
341 query_ctx,
342 } = args;
343
344 {
346 let is_exist = self.tasks.read().await.contains_key(&flow_id);
347 match (create_if_not_exists, or_replace, is_exist) {
348 (_, true, true) => {
350 info!("Replacing flow with id={}", flow_id);
351 }
352 (false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
353 (true, false, true) => {
355 info!("Flow with id={} already exists, do nothing", flow_id);
356 return Ok(None);
357 }
358
359 (_, _, false) => (),
361 }
362 }
363
364 let flow_type = flow_options.get(FlowType::FLOW_TYPE_KEY);
365
366 ensure!(
367 match flow_type {
368 None => true,
369 Some(ty) if ty == FlowType::BATCHING => true,
370 _ => false,
371 },
372 UnexpectedSnafu {
373 reason: format!("Flow type is not batching nor None, got {flow_type:?}")
374 }
375 );
376
377 let Some(query_ctx) = query_ctx else {
378 UnexpectedSnafu {
379 reason: "Query context is None".to_string(),
380 }
381 .fail()?
382 };
383 let query_ctx = Arc::new(query_ctx);
384 let mut source_table_names = Vec::with_capacity(2);
385 for src_id in source_table_ids {
386 let table_name = get_table_name(self.table_meta.table_info_manager(), &src_id).await?;
388 let table_info = get_table_info(self.table_meta.table_info_manager(), &src_id).await?;
389 ensure!(
390 table_info.table_info.meta.options.ttl != Some(TimeToLive::Instant),
391 UnsupportedSnafu {
392 reason: format!(
393 "Source table `{}`(id={}) has instant TTL, Instant TTL is not supported under batching mode. Consider using a TTL longer than flush interval",
394 table_name.join("."),
395 src_id
396 ),
397 }
398 );
399
400 source_table_names.push(table_name);
401 }
402
403 let (tx, rx) = oneshot::channel();
404
405 let plan = sql_to_df_plan(query_ctx.clone(), self.query_engine.clone(), &sql, true).await?;
406 let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
407 &plan,
408 self.query_engine.engine_state().catalog_manager().clone(),
409 query_ctx.clone(),
410 )
411 .await?;
412
413 let phy_expr = time_window_expr
414 .map(|expr| {
415 TimeWindowExpr::from_expr(
416 &expr,
417 &column_name,
418 &df_schema,
419 &self.query_engine.engine_state().session_state(),
420 )
421 })
422 .transpose()?;
423
424 debug!(
425 "Flow id={}, found time window expr={}",
426 flow_id,
427 phy_expr
428 .as_ref()
429 .map(|phy_expr| phy_expr.to_string())
430 .unwrap_or("None".to_string())
431 );
432
433 let task_args = TaskArgs {
434 flow_id,
435 query: &sql,
436 plan,
437 time_window_expr: phy_expr,
438 expire_after,
439 sink_table_name,
440 source_table_names,
441 query_ctx,
442 catalog_manager: self.catalog_manager.clone(),
443 shutdown_rx: rx,
444 batch_opts: self.batch_opts.clone(),
445 };
446
447 let task = BatchingTask::try_new(task_args)?;
448
449 let task_inner = task.clone();
450 let engine = self.query_engine.clone();
451 let frontend = self.frontend_client.clone();
452
453 task.check_or_create_sink_table(&engine, &frontend).await?;
455
456 let handle = common_runtime::spawn_global(async move {
458 task_inner.start_executing_loop(engine, frontend).await;
459 });
460 task.state.write().unwrap().task_handle = Some(handle);
461
462 let replaced_old_task_opt = self.tasks.write().await.insert(flow_id, task);
464 drop(replaced_old_task_opt);
465
466 self.shutdown_txs.write().await.insert(flow_id, tx);
467
468 Ok(Some(flow_id))
469 }
470
471 pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
472 if self.tasks.write().await.remove(&flow_id).is_none() {
473 warn!("Flow {flow_id} not found in tasks");
474 FlowNotFoundSnafu { id: flow_id }.fail()?;
475 }
476 let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else {
477 UnexpectedSnafu {
478 reason: format!("Can't found shutdown tx for flow {flow_id}"),
479 }
480 .fail()?
481 };
482 if tx.send(()).is_err() {
483 warn!("Fail to shutdown flow {flow_id} due to receiver already dropped, maybe flow {flow_id} is already dropped?")
484 }
485 Ok(())
486 }
487
488 pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
491 debug!("Try flush flow {flow_id}");
492 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
496 let task = self.tasks.read().await.get(&flow_id).cloned();
497 let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
498
499 let time_window_size = task
500 .config
501 .time_window_expr
502 .as_ref()
503 .and_then(|expr| *expr.time_window_size());
504
505 let cur_dirty_window_cnt = time_window_size.map(|time_window_size| {
506 task.state
507 .read()
508 .unwrap()
509 .dirty_time_windows
510 .effective_count(&time_window_size)
511 });
512
513 let res = task
514 .gen_exec_once(
515 &self.query_engine,
516 &self.frontend_client,
517 cur_dirty_window_cnt,
518 )
519 .await?;
520
521 let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize;
522 debug!(
523 "Successfully flush flow {flow_id}, affected rows={}",
524 affected_rows
525 );
526 Ok(affected_rows)
527 }
528
529 pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
531 self.tasks.read().await.contains_key(&flow_id)
532 }
533}
534
535impl FlowEngine for BatchingEngine {
536 async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
537 self.create_flow_inner(args).await
538 }
539 async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
540 self.remove_flow_inner(flow_id).await
541 }
542 async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
543 self.flush_flow_inner(flow_id).await
544 }
545 async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
546 Ok(self.flow_exist_inner(flow_id).await)
547 }
548 async fn list_flows(&self) -> Result<impl IntoIterator<Item = FlowId>, Error> {
549 Ok(self.tasks.read().await.keys().cloned().collect::<Vec<_>>())
550 }
551 async fn handle_flow_inserts(
552 &self,
553 request: api::v1::region::InsertRequests,
554 ) -> Result<(), Error> {
555 self.handle_inserts_inner(request).await
556 }
557}