1use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18
19use api::v1::CreateTableExpr;
20use catalog::CatalogManagerRef;
21use common_error::ext::BoxedError;
22use common_query::logical_plan::breakup_insert_plan;
23use common_telemetry::tracing::warn;
24use common_telemetry::{debug, info};
25use common_time::Timestamp;
26use datafusion::datasource::DefaultTableSource;
27use datafusion::sql::unparser::expr_to_sql;
28use datafusion_common::DFSchemaRef;
29use datafusion_common::tree_node::{Transformed, TreeNode};
30use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
31use datatypes::prelude::ConcreteDataType;
32use datatypes::schema::{ColumnSchema, Schema};
33use operator::expr_helper::column_schemas_to_defs;
34use query::QueryEngineRef;
35use query::query_engine::DefaultSerializer;
36use session::context::QueryContextRef;
37use snafu::{OptionExt, ResultExt, ensure};
38use sql::parser::{ParseOptions, ParserContext};
39use sql::statements::statement::Statement;
40use store_api::mito_engine_options::MERGE_MODE_KEY;
41use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
42use table::table::adapter::DfTableProviderAdapter;
43use tokio::sync::oneshot;
44use tokio::sync::oneshot::error::TryRecvError;
45use tokio::time::Instant;
46
47use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
48use crate::batching_mode::BatchingModeOptions;
49use crate::batching_mode::frontend_client::FrontendClient;
50use crate::batching_mode::state::{FilterExprInfo, TaskState};
51use crate::batching_mode::time_window::TimeWindowExpr;
52use crate::batching_mode::utils::{
53 AddFilterRewriter, ColumnMatcherRewriter, FindGroupByFinalName, gen_plan_with_matching_schema,
54 get_table_info_df_schema, sql_to_df_plan,
55};
56use crate::df_optimizer::apply_df_optimizer;
57use crate::error::{
58 ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
59 SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
60};
61use crate::metrics::{
62 METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
63 METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
64 METRIC_FLOW_ROWS,
65};
66use crate::{Error, FlowId};
67
68#[derive(Clone)]
70pub struct TaskConfig {
71 pub flow_id: FlowId,
72 pub query: String,
73 pub output_schema: DFSchemaRef,
75 pub time_window_expr: Option<TimeWindowExpr>,
76 pub expire_after: Option<i64>,
78 pub sink_table_name: [String; 3],
79 pub source_table_names: HashSet<[String; 3]>,
80 pub catalog_manager: CatalogManagerRef,
81 pub query_type: QueryType,
82 pub batch_opts: Arc<BatchingModeOptions>,
83 pub flow_eval_interval: Option<Duration>,
84}
85
86fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
87 let stmts =
88 ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
89 .map_err(BoxedError::new)
90 .context(ExternalSnafu)?;
91
92 ensure!(
93 stmts.len() == 1,
94 InvalidQuerySnafu {
95 reason: format!("Expect only one statement, found {}", stmts.len())
96 }
97 );
98 let stmt = &stmts[0];
99 match stmt {
100 Statement::Tql(_) => Ok(QueryType::Tql),
101 _ => Ok(QueryType::Sql),
102 }
103}
104
105fn is_merge_mode_last_non_null(options: &HashMap<String, String>) -> bool {
106 options
107 .get(MERGE_MODE_KEY)
108 .map(|mode| mode.eq_ignore_ascii_case("last_non_null"))
109 .unwrap_or(false)
110}
111
112#[derive(Debug, Clone, PartialEq, Eq)]
113pub enum QueryType {
114 Tql,
116 Sql,
118}
119
120#[derive(Clone)]
121pub struct BatchingTask {
122 pub config: Arc<TaskConfig>,
123 pub state: Arc<RwLock<TaskState>>,
124}
125
126pub struct TaskArgs<'a> {
128 pub flow_id: FlowId,
129 pub query: &'a str,
130 pub plan: LogicalPlan,
131 pub time_window_expr: Option<TimeWindowExpr>,
132 pub expire_after: Option<i64>,
133 pub sink_table_name: [String; 3],
134 pub source_table_names: Vec<[String; 3]>,
135 pub query_ctx: QueryContextRef,
136 pub catalog_manager: CatalogManagerRef,
137 pub shutdown_rx: oneshot::Receiver<()>,
138 pub batch_opts: Arc<BatchingModeOptions>,
139 pub flow_eval_interval: Option<Duration>,
140}
141
142pub struct PlanInfo {
143 pub plan: LogicalPlan,
144 pub filter: Option<FilterExprInfo>,
145}
146
147impl BatchingTask {
148 #[allow(clippy::too_many_arguments)]
149 pub fn try_new(
150 TaskArgs {
151 flow_id,
152 query,
153 plan,
154 time_window_expr,
155 expire_after,
156 sink_table_name,
157 source_table_names,
158 query_ctx,
159 catalog_manager,
160 shutdown_rx,
161 batch_opts,
162 flow_eval_interval,
163 }: TaskArgs<'_>,
164 ) -> Result<Self, Error> {
165 Ok(Self {
166 config: Arc::new(TaskConfig {
167 flow_id,
168 query: query.to_string(),
169 time_window_expr,
170 expire_after,
171 sink_table_name,
172 source_table_names: source_table_names.into_iter().collect(),
173 catalog_manager,
174 output_schema: plan.schema().clone(),
175 query_type: determine_query_type(query, &query_ctx)?,
176 batch_opts,
177 flow_eval_interval,
178 }),
179 state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
180 })
181 }
182
183 pub fn last_execution_time_millis(&self) -> Option<i64> {
184 self.state.read().unwrap().last_execution_time_millis()
185 }
186
187 pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
191 let now = SystemTime::now();
192 let now = Timestamp::new_second(
193 now.duration_since(UNIX_EPOCH)
194 .expect("Time went backwards")
195 .as_secs() as _,
196 );
197 let lower_bound = self
198 .config
199 .expire_after
200 .map(|e| now.sub_duration(Duration::from_secs(e as _)))
201 .transpose()
202 .map_err(BoxedError::new)
203 .context(ExternalSnafu)?
204 .unwrap_or(Timestamp::new_second(0));
205 debug!(
206 "Flow {} mark range ({:?}, {:?}) as dirty",
207 self.config.flow_id, lower_bound, now
208 );
209 self.state
210 .write()
211 .unwrap()
212 .dirty_time_windows
213 .add_window(lower_bound, Some(now));
214 Ok(())
215 }
216
217 pub async fn check_or_create_sink_table(
219 &self,
220 engine: &QueryEngineRef,
221 frontend_client: &Arc<FrontendClient>,
222 ) -> Result<Option<(u32, Duration)>, Error> {
223 if !self.is_table_exist(&self.config.sink_table_name).await? {
224 let create_table = self.gen_create_table_expr(engine.clone()).await?;
225 info!(
226 "Try creating sink table(if not exists) with expr: {:?}",
227 create_table
228 );
229 self.create_table(frontend_client, create_table).await?;
230 info!(
231 "Sink table {}(if not exists) created",
232 self.config.sink_table_name.join(".")
233 );
234 }
235
236 Ok(None)
237 }
238
239 async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
240 self.config
241 .catalog_manager
242 .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
243 .await
244 .map_err(BoxedError::new)
245 .context(ExternalSnafu)
246 }
247
248 pub async fn gen_exec_once(
249 &self,
250 engine: &QueryEngineRef,
251 frontend_client: &Arc<FrontendClient>,
252 max_window_cnt: Option<usize>,
253 ) -> Result<Option<(u32, Duration)>, Error> {
254 if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? {
255 debug!("Generate new query: {}", new_query.plan);
256 self.execute_logical_plan(frontend_client, &new_query.plan)
257 .await
258 } else {
259 debug!("Generate no query");
260 Ok(None)
261 }
262 }
263
264 pub async fn gen_insert_plan(
265 &self,
266 engine: &QueryEngineRef,
267 max_window_cnt: Option<usize>,
268 ) -> Result<Option<PlanInfo>, Error> {
269 let (table, df_schema) = get_table_info_df_schema(
270 self.config.catalog_manager.clone(),
271 self.config.sink_table_name.clone(),
272 )
273 .await?;
274
275 let table_meta = &table.table_info().meta;
276 let merge_mode_last_non_null =
277 is_merge_mode_last_non_null(&table_meta.options.extra_options);
278 let primary_key_indices = table_meta.primary_key_indices.clone();
279
280 let new_query = self
281 .gen_query_with_time_window(
282 engine.clone(),
283 &table.table_info().meta.schema,
284 &primary_key_indices,
285 merge_mode_last_non_null,
286 max_window_cnt,
287 )
288 .await?;
289
290 let insert_into_info = if let Some(new_query) = new_query {
291 let table_columns = df_schema
294 .columns()
295 .into_iter()
296 .map(|c| c.name)
297 .collect::<BTreeSet<_>>();
298 for column in new_query.plan.schema().columns() {
299 ensure!(
300 table_columns.contains(column.name()),
301 InvalidQuerySnafu {
302 reason: format!(
303 "Column {} not found in sink table with columns {:?}",
304 column, table_columns
305 ),
306 }
307 );
308 }
309
310 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
311 let table_source = Arc::new(DefaultTableSource::new(table_provider));
312
313 let plan = LogicalPlan::Dml(DmlStatement::new(
315 datafusion_common::TableReference::Full {
316 catalog: self.config.sink_table_name[0].clone().into(),
317 schema: self.config.sink_table_name[1].clone().into(),
318 table: self.config.sink_table_name[2].clone().into(),
319 },
320 table_source,
321 WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
322 Arc::new(new_query.plan),
323 ));
324 PlanInfo {
325 plan,
326 filter: new_query.filter,
327 }
328 } else {
329 return Ok(None);
330 };
331 let insert_into = insert_into_info
332 .plan
333 .recompute_schema()
334 .context(DatafusionSnafu {
335 context: "Failed to recompute schema",
336 })?;
337
338 Ok(Some(PlanInfo {
339 plan: insert_into,
340 filter: insert_into_info.filter,
341 }))
342 }
343
344 pub async fn create_table(
345 &self,
346 frontend_client: &Arc<FrontendClient>,
347 expr: CreateTableExpr,
348 ) -> Result<(), Error> {
349 let catalog = &self.config.sink_table_name[0];
350 let schema = &self.config.sink_table_name[1];
351 frontend_client
352 .create(expr.clone(), catalog, schema)
353 .await?;
354 Ok(())
355 }
356
357 pub async fn execute_logical_plan(
358 &self,
359 frontend_client: &Arc<FrontendClient>,
360 plan: &LogicalPlan,
361 ) -> Result<Option<(u32, Duration)>, Error> {
362 let instant = Instant::now();
363 let flow_id = self.config.flow_id;
364
365 debug!(
366 "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
367 self.config.expire_after, &plan
368 );
369
370 let catalog = &self.config.sink_table_name[0];
371 let schema = &self.config.sink_table_name[1];
372
373 let plan = plan
375 .clone()
376 .transform_down_with_subqueries(|p| {
377 if let LogicalPlan::TableScan(mut table_scan) = p {
378 let resolved = table_scan.table_name.resolve(catalog, schema);
379 table_scan.table_name = resolved.into();
380 Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
381 } else {
382 Ok(Transformed::no(p))
383 }
384 })
385 .with_context(|_| DatafusionSnafu {
386 context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
387 })?
388 .data;
389
390 let mut peer_desc = None;
391
392 let res = {
393 let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
394 .with_label_values(&[flow_id.to_string().as_str()])
395 .start_timer();
396
397 let req = if let Some((insert_to, insert_plan)) =
399 breakup_insert_plan(&plan, catalog, schema)
400 {
401 let message = DFLogicalSubstraitConvertor {}
402 .encode(&insert_plan, DefaultSerializer)
403 .context(SubstraitEncodeLogicalPlanSnafu)?;
404 api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
405 query: Some(api::v1::query_request::Query::InsertIntoPlan(
406 api::v1::InsertIntoPlan {
407 table_name: Some(insert_to),
408 logical_plan: message.to_vec(),
409 },
410 )),
411 })
412 } else {
413 let message = DFLogicalSubstraitConvertor {}
414 .encode(&plan, DefaultSerializer)
415 .context(SubstraitEncodeLogicalPlanSnafu)?;
416
417 api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
418 query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
419 })
420 };
421
422 frontend_client
423 .handle(req, catalog, schema, &mut peer_desc)
424 .await
425 };
426
427 let elapsed = instant.elapsed();
428 if let Ok(affected_rows) = &res {
429 debug!(
430 "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
431 elapsed
432 );
433 METRIC_FLOW_ROWS
434 .with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
435 .inc_by(*affected_rows as _);
436 } else if let Err(err) = &res {
437 warn!(
438 "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
439 peer_desc, elapsed, &plan
440 );
441 }
442
443 if elapsed >= self.config.batch_opts.slow_query_threshold {
445 warn!(
446 "Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}",
447 peer_desc, elapsed, &plan
448 );
449 METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
450 .with_label_values(&[
451 flow_id.to_string().as_str(),
452 &peer_desc.unwrap_or_default().to_string(),
453 ])
454 .observe(elapsed.as_secs_f64());
455 }
456
457 self.state
458 .write()
459 .unwrap()
460 .after_query_exec(elapsed, res.is_ok());
461
462 let res = res?;
463
464 Ok(Some((res, elapsed)))
465 }
466
467 pub async fn start_executing_loop(
471 &self,
472 engine: QueryEngineRef,
473 frontend_client: Arc<FrontendClient>,
474 ) {
475 let flow_id_str = self.config.flow_id.to_string();
476 let mut max_window_cnt = None;
477 let mut interval = self
478 .config
479 .flow_eval_interval
480 .map(|d| tokio::time::interval(d));
481 if let Some(tick) = &mut interval {
482 tick.tick().await; }
484 loop {
485 {
488 let mut state = self.state.write().unwrap();
489 match state.shutdown_rx.try_recv() {
490 Ok(()) => break,
491 Err(TryRecvError::Closed) => {
492 warn!(
493 "Unexpected shutdown flow {}, shutdown anyway",
494 self.config.flow_id
495 );
496 break;
497 }
498 Err(TryRecvError::Empty) => (),
499 }
500 }
501 METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
502 .with_label_values(&[&flow_id_str])
503 .inc();
504
505 let min_refresh = self.config.batch_opts.experimental_min_refresh_duration;
506
507 let new_query = match self.gen_insert_plan(&engine, max_window_cnt).await {
508 Ok(new_query) => new_query,
509 Err(err) => {
510 common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
511 tokio::time::sleep(min_refresh).await;
513 continue;
514 }
515 };
516
517 let res = if let Some(new_query) = &new_query {
518 self.execute_logical_plan(&frontend_client, &new_query.plan)
519 .await
520 } else {
521 Ok(None)
522 };
523
524 match res {
525 Ok(Some(_)) => {
527 max_window_cnt = max_window_cnt.map(|cnt| {
529 (cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query)
530 });
531
532 if let Some(eval_interval) = &mut interval {
534 eval_interval.tick().await;
535 } else {
536 let sleep_until = {
539 let state = self.state.write().unwrap();
540
541 let time_window_size = self
542 .config
543 .time_window_expr
544 .as_ref()
545 .and_then(|t| *t.time_window_size());
546
547 state.get_next_start_query_time(
548 self.config.flow_id,
549 &time_window_size,
550 min_refresh,
551 Some(self.config.batch_opts.query_timeout),
552 self.config.batch_opts.experimental_max_filter_num_per_query,
553 )
554 };
555
556 tokio::time::sleep_until(sleep_until).await;
557 };
558 }
559 Ok(None) => {
561 debug!(
562 "Flow id = {:?} found no new data, sleep for {:?} then continue",
563 self.config.flow_id, min_refresh
564 );
565 tokio::time::sleep(min_refresh).await;
566 continue;
567 }
568 Err(err) => {
570 METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
571 .with_label_values(&[&flow_id_str])
572 .inc();
573 match new_query {
574 Some(query) => {
575 common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
576 self.state.write().unwrap().dirty_time_windows.add_windows(
578 query.filter.map(|f| f.time_ranges).unwrap_or_default(),
579 );
580 max_window_cnt = Some(1);
585 }
586 None => {
587 common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
588 }
589 }
590 tokio::time::sleep(min_refresh).await;
592 }
593 }
594 }
595 }
596
597 async fn gen_create_table_expr(
604 &self,
605 engine: QueryEngineRef,
606 ) -> Result<CreateTableExpr, Error> {
607 let query_ctx = self.state.read().unwrap().query_ctx.clone();
608 let plan =
609 sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
610 create_table_with_expr(&plan, &self.config.sink_table_name, &self.config.query_type)
611 }
612
613 async fn gen_query_with_time_window(
615 &self,
616 engine: QueryEngineRef,
617 sink_table_schema: &Arc<Schema>,
618 primary_key_indices: &[usize],
619 allow_partial: bool,
620 max_window_cnt: Option<usize>,
621 ) -> Result<Option<PlanInfo>, Error> {
622 let query_ctx = self.state.read().unwrap().query_ctx.clone();
623 let start = SystemTime::now();
624 let since_the_epoch = start
625 .duration_since(UNIX_EPOCH)
626 .expect("Time went backwards");
627 let low_bound = self
628 .config
629 .expire_after
630 .map(|e| since_the_epoch.as_secs() - e as u64)
631 .unwrap_or(u64::MIN);
632
633 let low_bound = Timestamp::new_second(low_bound as i64);
634
635 let expire_time_window_bound = self
636 .config
637 .time_window_expr
638 .as_ref()
639 .map(|expr| expr.eval(low_bound))
640 .transpose()?;
641
642 let (expire_lower_bound, expire_upper_bound) =
643 match (expire_time_window_bound, &self.config.query_type) {
644 (Some((Some(l), Some(u))), QueryType::Sql) => (l, u),
645 (None, QueryType::Sql) => {
646 debug!(
649 "Flow id = {:?}, no time window, using the same query",
650 self.config.flow_id
651 );
652 let is_dirty = !self.state.read().unwrap().dirty_time_windows.is_empty();
654 self.state.write().unwrap().dirty_time_windows.clean();
655
656 if !is_dirty {
657 debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
659 return Ok(None);
660 }
661
662 let plan = gen_plan_with_matching_schema(
663 &self.config.query,
664 query_ctx,
665 engine,
666 sink_table_schema.clone(),
667 primary_key_indices,
668 allow_partial,
669 )
670 .await?;
671
672 return Ok(Some(PlanInfo { plan, filter: None }));
673 }
674 _ => {
675 self.state.write().unwrap().dirty_time_windows.clean();
677
678 let plan = gen_plan_with_matching_schema(
679 &self.config.query,
680 query_ctx,
681 engine,
682 sink_table_schema.clone(),
683 primary_key_indices,
684 allow_partial,
685 )
686 .await?;
687
688 return Ok(Some(PlanInfo { plan, filter: None }));
689 }
690 };
691
692 debug!(
693 "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
694 self.config.flow_id,
695 expire_lower_bound,
696 expire_upper_bound,
697 self.state.read().unwrap().dirty_time_windows
698 );
699 let window_size = expire_upper_bound
700 .sub(&expire_lower_bound)
701 .with_context(|| UnexpectedSnafu {
702 reason: format!(
703 "Can't get window size from {expire_upper_bound:?} - {expire_lower_bound:?}"
704 ),
705 })?;
706 let col_name = self
707 .config
708 .time_window_expr
709 .as_ref()
710 .map(|expr| expr.column_name.clone())
711 .with_context(|| UnexpectedSnafu {
712 reason: format!(
713 "Flow id={:?}, Failed to get column name from time window expr",
714 self.config.flow_id
715 ),
716 })?;
717
718 let expr = self
719 .state
720 .write()
721 .unwrap()
722 .dirty_time_windows
723 .gen_filter_exprs(
724 &col_name,
725 Some(expire_lower_bound),
726 window_size,
727 max_window_cnt
728 .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query),
729 self.config.flow_id,
730 Some(self),
731 )?;
732
733 debug!(
734 "Flow id={:?}, Generated filter expr: {:?}",
735 self.config.flow_id,
736 expr.as_ref()
737 .map(
738 |expr| expr_to_sql(&expr.expr).with_context(|_| DatafusionSnafu {
739 context: format!("Failed to generate filter expr from {expr:?}"),
740 })
741 )
742 .transpose()?
743 .map(|s| s.to_string())
744 );
745
746 let Some(expr) = expr else {
747 debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
749 return Ok(None);
750 };
751
752 let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
753 let mut add_auto_column = ColumnMatcherRewriter::new(
754 sink_table_schema.clone(),
755 primary_key_indices.to_vec(),
756 allow_partial,
757 );
758
759 let plan =
760 sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;
761 let rewrite = plan
762 .clone()
763 .rewrite(&mut add_filter)
764 .and_then(|p| p.data.rewrite(&mut add_auto_column))
765 .with_context(|_| DatafusionSnafu {
766 context: format!("Failed to rewrite plan:\n {}\n", plan),
767 })?
768 .data;
769 let new_plan = apply_df_optimizer(rewrite, &query_ctx).await?;
771
772 let info = PlanInfo {
773 plan: new_plan.clone(),
774 filter: Some(expr),
775 };
776
777 Ok(Some(info))
778 }
779}
780
781fn create_table_with_expr(
784 plan: &LogicalPlan,
785 sink_table_name: &[String; 3],
786 query_type: &QueryType,
787) -> Result<CreateTableExpr, Error> {
788 let table_def = match query_type {
789 &QueryType::Sql => {
790 if let Some(def) = build_pk_from_aggr(plan)? {
791 def
792 } else {
793 build_by_sql_schema(plan)?
794 }
795 }
796 QueryType::Tql => {
797 if let Some(table_def) = build_pk_from_aggr(plan)? {
799 table_def
800 } else {
801 build_by_tql_schema(plan)?
802 }
803 }
804 };
805 let first_time_stamp = table_def.ts_col;
806 let primary_keys = table_def.pks;
807
808 let mut column_schemas = Vec::new();
809 for field in plan.schema().fields() {
810 let name = field.name();
811 let ty = ConcreteDataType::from_arrow_type(field.data_type());
812 let col_schema = if first_time_stamp == Some(name.clone()) {
813 ColumnSchema::new(name, ty, false).with_time_index(true)
814 } else {
815 ColumnSchema::new(name, ty, true)
816 };
817
818 match query_type {
819 QueryType::Sql => {
820 column_schemas.push(col_schema);
821 }
822 QueryType::Tql => {
823 let is_tag_column = primary_keys.contains(name);
826 let is_val_column = !is_tag_column && first_time_stamp.as_ref() != Some(name);
827 if is_val_column {
828 let col_schema =
829 ColumnSchema::new(name, ConcreteDataType::float64_datatype(), true);
830 column_schemas.push(col_schema);
831 } else if is_tag_column {
832 let col_schema =
833 ColumnSchema::new(name, ConcreteDataType::string_datatype(), true);
834 column_schemas.push(col_schema);
835 } else {
836 column_schemas.push(col_schema);
838 }
839 }
840 }
841 }
842
843 if query_type == &QueryType::Sql {
844 let update_at_schema = ColumnSchema::new(
845 AUTO_CREATED_UPDATE_AT_TS_COL,
846 ConcreteDataType::timestamp_millisecond_datatype(),
847 true,
848 );
849 column_schemas.push(update_at_schema);
850 }
851
852 let time_index = if let Some(time_index) = first_time_stamp {
853 time_index
854 } else {
855 column_schemas.push(
856 ColumnSchema::new(
857 AUTO_CREATED_PLACEHOLDER_TS_COL,
858 ConcreteDataType::timestamp_millisecond_datatype(),
859 false,
860 )
861 .with_time_index(true),
862 );
863 AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
864 };
865
866 let column_defs =
867 column_schemas_to_defs(column_schemas, &primary_keys).context(ConvertColumnSchemaSnafu)?;
868 Ok(CreateTableExpr {
869 catalog_name: sink_table_name[0].clone(),
870 schema_name: sink_table_name[1].clone(),
871 table_name: sink_table_name[2].clone(),
872 desc: "Auto created table by flow engine".to_string(),
873 column_defs,
874 time_index,
875 primary_keys,
876 create_if_not_exists: true,
877 table_options: Default::default(),
878 table_id: None,
879 engine: "mito".to_string(),
880 })
881}
882
883fn build_by_sql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
885 let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
886 if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
887 Some(f.name().clone())
888 } else {
889 None
890 }
891 });
892 Ok(TableDef {
893 ts_col: first_time_stamp,
894 pks: vec![],
895 })
896}
897
898fn build_by_tql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
900 let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
901 if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
902 Some(f.name().clone())
903 } else {
904 None
905 }
906 });
907 let string_columns = plan
908 .schema()
909 .fields()
910 .iter()
911 .filter_map(|f| {
912 if ConcreteDataType::from_arrow_type(f.data_type()).is_string() {
913 Some(f.name().clone())
914 } else {
915 None
916 }
917 })
918 .collect::<Vec<_>>();
919
920 Ok(TableDef {
921 ts_col: first_time_stamp,
922 pks: string_columns,
923 })
924}
925
926struct TableDef {
927 ts_col: Option<String>,
928 pks: Vec<String>,
929}
930
931fn build_pk_from_aggr(plan: &LogicalPlan) -> Result<Option<TableDef>, Error> {
940 let fields = plan.schema().fields();
941 let mut pk_names = FindGroupByFinalName::default();
942
943 plan.visit(&mut pk_names)
944 .with_context(|_| DatafusionSnafu {
945 context: format!("Can't find aggr expr in plan {plan:?}"),
946 })?;
947
948 let Some(pk_final_names) = pk_names.get_group_expr_names() else {
950 return Ok(None);
951 };
952 if pk_final_names.is_empty() {
953 let first_ts_col = fields
954 .iter()
955 .find(|f| ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp())
956 .map(|f| f.name().clone());
957 return Ok(Some(TableDef {
958 ts_col: first_ts_col,
959 pks: vec![],
960 }));
961 }
962
963 let all_pk_cols: Vec<_> = fields
964 .iter()
965 .filter(|f| pk_final_names.contains(f.name()))
966 .map(|f| f.name().clone())
967 .collect();
968 let first_time_stamp = fields
970 .iter()
971 .find(|f| {
972 all_pk_cols.contains(&f.name().clone())
973 && ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()
974 })
975 .map(|f| f.name().clone());
976
977 let all_pk_cols: Vec<_> = all_pk_cols
978 .into_iter()
979 .filter(|col| first_time_stamp.as_ref() != Some(col))
980 .collect();
981
982 Ok(Some(TableDef {
983 ts_col: first_time_stamp,
984 pks: all_pk_cols,
985 }))
986}
987
988#[cfg(test)]
989mod test {
990 use api::v1::column_def::try_as_column_schema;
991 use pretty_assertions::assert_eq;
992 use session::context::QueryContext;
993
994 use super::*;
995 use crate::test_utils::create_test_query_engine;
996
997 #[tokio::test]
998 async fn test_gen_create_table_sql() {
999 let query_engine = create_test_query_engine();
1000 let ctx = QueryContext::arc();
1001 struct TestCase {
1002 sql: String,
1003 sink_table_name: String,
1004 column_schemas: Vec<ColumnSchema>,
1005 primary_keys: Vec<String>,
1006 time_index: String,
1007 }
1008
1009 let update_at_schema = ColumnSchema::new(
1010 AUTO_CREATED_UPDATE_AT_TS_COL,
1011 ConcreteDataType::timestamp_millisecond_datatype(),
1012 true,
1013 );
1014
1015 let ts_placeholder_schema = ColumnSchema::new(
1016 AUTO_CREATED_PLACEHOLDER_TS_COL,
1017 ConcreteDataType::timestamp_millisecond_datatype(),
1018 false,
1019 )
1020 .with_time_index(true);
1021
1022 let testcases = vec![
1023 TestCase {
1024 sql: "SELECT number, ts FROM numbers_with_ts".to_string(),
1025 sink_table_name: "new_table".to_string(),
1026 column_schemas: vec![
1027 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1028 ColumnSchema::new(
1029 "ts",
1030 ConcreteDataType::timestamp_millisecond_datatype(),
1031 false,
1032 )
1033 .with_time_index(true),
1034 update_at_schema.clone(),
1035 ],
1036 primary_keys: vec![],
1037 time_index: "ts".to_string(),
1038 },
1039 TestCase {
1040 sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(),
1041 sink_table_name: "new_table".to_string(),
1042 column_schemas: vec![
1043 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1044 ColumnSchema::new(
1045 "max(numbers_with_ts.ts)",
1046 ConcreteDataType::timestamp_millisecond_datatype(),
1047 true,
1048 ),
1049 update_at_schema.clone(),
1050 ts_placeholder_schema.clone(),
1051 ],
1052 primary_keys: vec!["number".to_string()],
1053 time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
1054 },
1055 TestCase {
1056 sql: "SELECT max(number), ts FROM numbers_with_ts GROUP BY ts".to_string(),
1057 sink_table_name: "new_table".to_string(),
1058 column_schemas: vec![
1059 ColumnSchema::new(
1060 "max(numbers_with_ts.number)",
1061 ConcreteDataType::uint32_datatype(),
1062 true,
1063 ),
1064 ColumnSchema::new(
1065 "ts",
1066 ConcreteDataType::timestamp_millisecond_datatype(),
1067 false,
1068 )
1069 .with_time_index(true),
1070 update_at_schema.clone(),
1071 ],
1072 primary_keys: vec![],
1073 time_index: "ts".to_string(),
1074 },
1075 TestCase {
1076 sql: "SELECT number, ts FROM numbers_with_ts GROUP BY ts, number".to_string(),
1077 sink_table_name: "new_table".to_string(),
1078 column_schemas: vec![
1079 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1080 ColumnSchema::new(
1081 "ts",
1082 ConcreteDataType::timestamp_millisecond_datatype(),
1083 false,
1084 )
1085 .with_time_index(true),
1086 update_at_schema.clone(),
1087 ],
1088 primary_keys: vec!["number".to_string()],
1089 time_index: "ts".to_string(),
1090 },
1091 ];
1092
1093 for tc in testcases {
1094 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &tc.sql, true)
1095 .await
1096 .unwrap();
1097 let expr = create_table_with_expr(
1098 &plan,
1099 &[
1100 "greptime".to_string(),
1101 "public".to_string(),
1102 tc.sink_table_name.clone(),
1103 ],
1104 &QueryType::Sql,
1105 )
1106 .unwrap();
1107 let column_schemas = expr
1109 .column_defs
1110 .iter()
1111 .map(|c| try_as_column_schema(c).unwrap())
1112 .collect::<Vec<_>>();
1113 assert_eq!(tc.column_schemas, column_schemas, "{:?}", tc.sql);
1114 assert_eq!(tc.primary_keys, expr.primary_keys, "{:?}", tc.sql);
1115 assert_eq!(tc.time_index, expr.time_index, "{:?}", tc.sql);
1116 }
1117 }
1118}