1use std::collections::{BTreeSet, HashSet};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18
19use api::v1::CreateTableExpr;
20use catalog::CatalogManagerRef;
21use common_error::ext::BoxedError;
22use common_query::logical_plan::breakup_insert_plan;
23use common_telemetry::tracing::warn;
24use common_telemetry::{debug, info};
25use common_time::Timestamp;
26use datafusion::datasource::DefaultTableSource;
27use datafusion::sql::unparser::expr_to_sql;
28use datafusion_common::DFSchemaRef;
29use datafusion_common::tree_node::{Transformed, TreeNode};
30use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
31use datatypes::prelude::ConcreteDataType;
32use datatypes::schema::{ColumnSchema, Schema};
33use operator::expr_helper::column_schemas_to_defs;
34use query::QueryEngineRef;
35use query::query_engine::DefaultSerializer;
36use session::context::QueryContextRef;
37use snafu::{OptionExt, ResultExt, ensure};
38use sql::parser::{ParseOptions, ParserContext};
39use sql::statements::statement::Statement;
40use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
41use table::table::adapter::DfTableProviderAdapter;
42use tokio::sync::oneshot;
43use tokio::sync::oneshot::error::TryRecvError;
44use tokio::time::Instant;
45
46use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
47use crate::batching_mode::BatchingModeOptions;
48use crate::batching_mode::frontend_client::FrontendClient;
49use crate::batching_mode::state::{FilterExprInfo, TaskState};
50use crate::batching_mode::time_window::TimeWindowExpr;
51use crate::batching_mode::utils::{
52 AddFilterRewriter, ColumnMatcherRewriter, FindGroupByFinalName, gen_plan_with_matching_schema,
53 get_table_info_df_schema, sql_to_df_plan,
54};
55use crate::df_optimizer::apply_df_optimizer;
56use crate::error::{
57 ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
58 SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
59};
60use crate::metrics::{
61 METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
62 METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
63 METRIC_FLOW_ROWS,
64};
65use crate::{Error, FlowId};
66
67#[derive(Clone)]
69pub struct TaskConfig {
70 pub flow_id: FlowId,
71 pub query: String,
72 pub output_schema: DFSchemaRef,
74 pub time_window_expr: Option<TimeWindowExpr>,
75 pub expire_after: Option<i64>,
77 pub sink_table_name: [String; 3],
78 pub source_table_names: HashSet<[String; 3]>,
79 pub catalog_manager: CatalogManagerRef,
80 pub query_type: QueryType,
81 pub batch_opts: Arc<BatchingModeOptions>,
82 pub flow_eval_interval: Option<Duration>,
83}
84
85fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
86 let stmts =
87 ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
88 .map_err(BoxedError::new)
89 .context(ExternalSnafu)?;
90
91 ensure!(
92 stmts.len() == 1,
93 InvalidQuerySnafu {
94 reason: format!("Expect only one statement, found {}", stmts.len())
95 }
96 );
97 let stmt = &stmts[0];
98 match stmt {
99 Statement::Tql(_) => Ok(QueryType::Tql),
100 _ => Ok(QueryType::Sql),
101 }
102}
103
104#[derive(Debug, Clone, PartialEq, Eq)]
105pub enum QueryType {
106 Tql,
108 Sql,
110}
111
112#[derive(Clone)]
113pub struct BatchingTask {
114 pub config: Arc<TaskConfig>,
115 pub state: Arc<RwLock<TaskState>>,
116}
117
118pub struct TaskArgs<'a> {
120 pub flow_id: FlowId,
121 pub query: &'a str,
122 pub plan: LogicalPlan,
123 pub time_window_expr: Option<TimeWindowExpr>,
124 pub expire_after: Option<i64>,
125 pub sink_table_name: [String; 3],
126 pub source_table_names: Vec<[String; 3]>,
127 pub query_ctx: QueryContextRef,
128 pub catalog_manager: CatalogManagerRef,
129 pub shutdown_rx: oneshot::Receiver<()>,
130 pub batch_opts: Arc<BatchingModeOptions>,
131 pub flow_eval_interval: Option<Duration>,
132}
133
134pub struct PlanInfo {
135 pub plan: LogicalPlan,
136 pub filter: Option<FilterExprInfo>,
137}
138
139impl BatchingTask {
140 #[allow(clippy::too_many_arguments)]
141 pub fn try_new(
142 TaskArgs {
143 flow_id,
144 query,
145 plan,
146 time_window_expr,
147 expire_after,
148 sink_table_name,
149 source_table_names,
150 query_ctx,
151 catalog_manager,
152 shutdown_rx,
153 batch_opts,
154 flow_eval_interval,
155 }: TaskArgs<'_>,
156 ) -> Result<Self, Error> {
157 Ok(Self {
158 config: Arc::new(TaskConfig {
159 flow_id,
160 query: query.to_string(),
161 time_window_expr,
162 expire_after,
163 sink_table_name,
164 source_table_names: source_table_names.into_iter().collect(),
165 catalog_manager,
166 output_schema: plan.schema().clone(),
167 query_type: determine_query_type(query, &query_ctx)?,
168 batch_opts,
169 flow_eval_interval,
170 }),
171 state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
172 })
173 }
174
175 pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
179 let now = SystemTime::now();
180 let now = Timestamp::new_second(
181 now.duration_since(UNIX_EPOCH)
182 .expect("Time went backwards")
183 .as_secs() as _,
184 );
185 let lower_bound = self
186 .config
187 .expire_after
188 .map(|e| now.sub_duration(Duration::from_secs(e as _)))
189 .transpose()
190 .map_err(BoxedError::new)
191 .context(ExternalSnafu)?
192 .unwrap_or(Timestamp::new_second(0));
193 debug!(
194 "Flow {} mark range ({:?}, {:?}) as dirty",
195 self.config.flow_id, lower_bound, now
196 );
197 self.state
198 .write()
199 .unwrap()
200 .dirty_time_windows
201 .add_window(lower_bound, Some(now));
202 Ok(())
203 }
204
205 pub async fn check_or_create_sink_table(
207 &self,
208 engine: &QueryEngineRef,
209 frontend_client: &Arc<FrontendClient>,
210 ) -> Result<Option<(u32, Duration)>, Error> {
211 if !self.is_table_exist(&self.config.sink_table_name).await? {
212 let create_table = self.gen_create_table_expr(engine.clone()).await?;
213 info!(
214 "Try creating sink table(if not exists) with expr: {:?}",
215 create_table
216 );
217 self.create_table(frontend_client, create_table).await?;
218 info!(
219 "Sink table {}(if not exists) created",
220 self.config.sink_table_name.join(".")
221 );
222 }
223
224 Ok(None)
225 }
226
227 async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
228 self.config
229 .catalog_manager
230 .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
231 .await
232 .map_err(BoxedError::new)
233 .context(ExternalSnafu)
234 }
235
236 pub async fn gen_exec_once(
237 &self,
238 engine: &QueryEngineRef,
239 frontend_client: &Arc<FrontendClient>,
240 max_window_cnt: Option<usize>,
241 ) -> Result<Option<(u32, Duration)>, Error> {
242 if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? {
243 debug!("Generate new query: {}", new_query.plan);
244 self.execute_logical_plan(frontend_client, &new_query.plan)
245 .await
246 } else {
247 debug!("Generate no query");
248 Ok(None)
249 }
250 }
251
252 pub async fn gen_insert_plan(
253 &self,
254 engine: &QueryEngineRef,
255 max_window_cnt: Option<usize>,
256 ) -> Result<Option<PlanInfo>, Error> {
257 let (table, df_schema) = get_table_info_df_schema(
258 self.config.catalog_manager.clone(),
259 self.config.sink_table_name.clone(),
260 )
261 .await?;
262
263 let new_query = self
264 .gen_query_with_time_window(
265 engine.clone(),
266 &table.table_info().meta.schema,
267 max_window_cnt,
268 )
269 .await?;
270
271 let insert_into_info = if let Some(new_query) = new_query {
272 let table_columns = df_schema
275 .columns()
276 .into_iter()
277 .map(|c| c.name)
278 .collect::<BTreeSet<_>>();
279 for column in new_query.plan.schema().columns() {
280 ensure!(
281 table_columns.contains(column.name()),
282 InvalidQuerySnafu {
283 reason: format!(
284 "Column {} not found in sink table with columns {:?}",
285 column, table_columns
286 ),
287 }
288 );
289 }
290
291 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
292 let table_source = Arc::new(DefaultTableSource::new(table_provider));
293
294 let plan = LogicalPlan::Dml(DmlStatement::new(
296 datafusion_common::TableReference::Full {
297 catalog: self.config.sink_table_name[0].clone().into(),
298 schema: self.config.sink_table_name[1].clone().into(),
299 table: self.config.sink_table_name[2].clone().into(),
300 },
301 table_source,
302 WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
303 Arc::new(new_query.plan),
304 ));
305 PlanInfo {
306 plan,
307 filter: new_query.filter,
308 }
309 } else {
310 return Ok(None);
311 };
312 let insert_into = insert_into_info
313 .plan
314 .recompute_schema()
315 .context(DatafusionSnafu {
316 context: "Failed to recompute schema",
317 })?;
318
319 Ok(Some(PlanInfo {
320 plan: insert_into,
321 filter: insert_into_info.filter,
322 }))
323 }
324
325 pub async fn create_table(
326 &self,
327 frontend_client: &Arc<FrontendClient>,
328 expr: CreateTableExpr,
329 ) -> Result<(), Error> {
330 let catalog = &self.config.sink_table_name[0];
331 let schema = &self.config.sink_table_name[1];
332 frontend_client
333 .create(expr.clone(), catalog, schema)
334 .await?;
335 Ok(())
336 }
337
338 pub async fn execute_logical_plan(
339 &self,
340 frontend_client: &Arc<FrontendClient>,
341 plan: &LogicalPlan,
342 ) -> Result<Option<(u32, Duration)>, Error> {
343 let instant = Instant::now();
344 let flow_id = self.config.flow_id;
345
346 debug!(
347 "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
348 self.config.expire_after, &plan
349 );
350
351 let catalog = &self.config.sink_table_name[0];
352 let schema = &self.config.sink_table_name[1];
353
354 let plan = plan
356 .clone()
357 .transform_down_with_subqueries(|p| {
358 if let LogicalPlan::TableScan(mut table_scan) = p {
359 let resolved = table_scan.table_name.resolve(catalog, schema);
360 table_scan.table_name = resolved.into();
361 Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
362 } else {
363 Ok(Transformed::no(p))
364 }
365 })
366 .with_context(|_| DatafusionSnafu {
367 context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
368 })?
369 .data;
370
371 let mut peer_desc = None;
372
373 let res = {
374 let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
375 .with_label_values(&[flow_id.to_string().as_str()])
376 .start_timer();
377
378 let req = if let Some((insert_to, insert_plan)) =
380 breakup_insert_plan(&plan, catalog, schema)
381 {
382 let message = DFLogicalSubstraitConvertor {}
383 .encode(&insert_plan, DefaultSerializer)
384 .context(SubstraitEncodeLogicalPlanSnafu)?;
385 api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
386 query: Some(api::v1::query_request::Query::InsertIntoPlan(
387 api::v1::InsertIntoPlan {
388 table_name: Some(insert_to),
389 logical_plan: message.to_vec(),
390 },
391 )),
392 })
393 } else {
394 let message = DFLogicalSubstraitConvertor {}
395 .encode(&plan, DefaultSerializer)
396 .context(SubstraitEncodeLogicalPlanSnafu)?;
397
398 api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
399 query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
400 })
401 };
402
403 frontend_client
404 .handle(req, catalog, schema, &mut peer_desc)
405 .await
406 };
407
408 let elapsed = instant.elapsed();
409 if let Ok(affected_rows) = &res {
410 debug!(
411 "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
412 elapsed
413 );
414 METRIC_FLOW_ROWS
415 .with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
416 .inc_by(*affected_rows as _);
417 } else if let Err(err) = &res {
418 warn!(
419 "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
420 peer_desc, elapsed, &plan
421 );
422 }
423
424 if elapsed >= self.config.batch_opts.slow_query_threshold {
426 warn!(
427 "Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}",
428 peer_desc, elapsed, &plan
429 );
430 METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
431 .with_label_values(&[
432 flow_id.to_string().as_str(),
433 &peer_desc.unwrap_or_default().to_string(),
434 ])
435 .observe(elapsed.as_secs_f64());
436 }
437
438 self.state
439 .write()
440 .unwrap()
441 .after_query_exec(elapsed, res.is_ok());
442
443 let res = res?;
444
445 Ok(Some((res, elapsed)))
446 }
447
448 pub async fn start_executing_loop(
452 &self,
453 engine: QueryEngineRef,
454 frontend_client: Arc<FrontendClient>,
455 ) {
456 let flow_id_str = self.config.flow_id.to_string();
457 let mut max_window_cnt = None;
458 let mut interval = self
459 .config
460 .flow_eval_interval
461 .map(|d| tokio::time::interval(d));
462 if let Some(tick) = &mut interval {
463 tick.tick().await; }
465 loop {
466 {
469 let mut state = self.state.write().unwrap();
470 match state.shutdown_rx.try_recv() {
471 Ok(()) => break,
472 Err(TryRecvError::Closed) => {
473 warn!(
474 "Unexpected shutdown flow {}, shutdown anyway",
475 self.config.flow_id
476 );
477 break;
478 }
479 Err(TryRecvError::Empty) => (),
480 }
481 }
482 METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
483 .with_label_values(&[&flow_id_str])
484 .inc();
485
486 let min_refresh = self.config.batch_opts.experimental_min_refresh_duration;
487
488 let new_query = match self.gen_insert_plan(&engine, max_window_cnt).await {
489 Ok(new_query) => new_query,
490 Err(err) => {
491 common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
492 tokio::time::sleep(min_refresh).await;
494 continue;
495 }
496 };
497
498 let res = if let Some(new_query) = &new_query {
499 self.execute_logical_plan(&frontend_client, &new_query.plan)
500 .await
501 } else {
502 Ok(None)
503 };
504
505 match res {
506 Ok(Some(_)) => {
508 max_window_cnt = max_window_cnt.map(|cnt| {
510 (cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query)
511 });
512
513 if let Some(eval_interval) = &mut interval {
515 eval_interval.tick().await;
516 } else {
517 let sleep_until = {
520 let state = self.state.write().unwrap();
521
522 let time_window_size = self
523 .config
524 .time_window_expr
525 .as_ref()
526 .and_then(|t| *t.time_window_size());
527
528 state.get_next_start_query_time(
529 self.config.flow_id,
530 &time_window_size,
531 min_refresh,
532 Some(self.config.batch_opts.query_timeout),
533 self.config.batch_opts.experimental_max_filter_num_per_query,
534 )
535 };
536
537 tokio::time::sleep_until(sleep_until).await;
538 };
539 }
540 Ok(None) => {
542 debug!(
543 "Flow id = {:?} found no new data, sleep for {:?} then continue",
544 self.config.flow_id, min_refresh
545 );
546 tokio::time::sleep(min_refresh).await;
547 continue;
548 }
549 Err(err) => {
551 METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
552 .with_label_values(&[&flow_id_str])
553 .inc();
554 match new_query {
555 Some(query) => {
556 common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
557 self.state.write().unwrap().dirty_time_windows.add_windows(
559 query.filter.map(|f| f.time_ranges).unwrap_or_default(),
560 );
561 max_window_cnt = Some(1);
566 }
567 None => {
568 common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
569 }
570 }
571 tokio::time::sleep(min_refresh).await;
573 }
574 }
575 }
576 }
577
578 async fn gen_create_table_expr(
585 &self,
586 engine: QueryEngineRef,
587 ) -> Result<CreateTableExpr, Error> {
588 let query_ctx = self.state.read().unwrap().query_ctx.clone();
589 let plan =
590 sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
591 create_table_with_expr(&plan, &self.config.sink_table_name, &self.config.query_type)
592 }
593
594 async fn gen_query_with_time_window(
596 &self,
597 engine: QueryEngineRef,
598 sink_table_schema: &Arc<Schema>,
599 max_window_cnt: Option<usize>,
600 ) -> Result<Option<PlanInfo>, Error> {
601 let query_ctx = self.state.read().unwrap().query_ctx.clone();
602 let start = SystemTime::now();
603 let since_the_epoch = start
604 .duration_since(UNIX_EPOCH)
605 .expect("Time went backwards");
606 let low_bound = self
607 .config
608 .expire_after
609 .map(|e| since_the_epoch.as_secs() - e as u64)
610 .unwrap_or(u64::MIN);
611
612 let low_bound = Timestamp::new_second(low_bound as i64);
613
614 let expire_time_window_bound = self
615 .config
616 .time_window_expr
617 .as_ref()
618 .map(|expr| expr.eval(low_bound))
619 .transpose()?;
620
621 let (expire_lower_bound, expire_upper_bound) =
622 match (expire_time_window_bound, &self.config.query_type) {
623 (Some((Some(l), Some(u))), QueryType::Sql) => (l, u),
624 (None, QueryType::Sql) => {
625 debug!(
628 "Flow id = {:?}, no time window, using the same query",
629 self.config.flow_id
630 );
631 let is_dirty = !self.state.read().unwrap().dirty_time_windows.is_empty();
633 self.state.write().unwrap().dirty_time_windows.clean();
634
635 if !is_dirty {
636 debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
638 return Ok(None);
639 }
640
641 let plan = gen_plan_with_matching_schema(
642 &self.config.query,
643 query_ctx,
644 engine,
645 sink_table_schema.clone(),
646 )
647 .await?;
648
649 return Ok(Some(PlanInfo { plan, filter: None }));
650 }
651 _ => {
652 self.state.write().unwrap().dirty_time_windows.clean();
654
655 let plan = gen_plan_with_matching_schema(
656 &self.config.query,
657 query_ctx,
658 engine,
659 sink_table_schema.clone(),
660 )
661 .await?;
662
663 return Ok(Some(PlanInfo { plan, filter: None }));
664 }
665 };
666
667 debug!(
668 "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
669 self.config.flow_id,
670 expire_lower_bound,
671 expire_upper_bound,
672 self.state.read().unwrap().dirty_time_windows
673 );
674 let window_size = expire_upper_bound
675 .sub(&expire_lower_bound)
676 .with_context(|| UnexpectedSnafu {
677 reason: format!(
678 "Can't get window size from {expire_upper_bound:?} - {expire_lower_bound:?}"
679 ),
680 })?;
681 let col_name = self
682 .config
683 .time_window_expr
684 .as_ref()
685 .map(|expr| expr.column_name.clone())
686 .with_context(|| UnexpectedSnafu {
687 reason: format!(
688 "Flow id={:?}, Failed to get column name from time window expr",
689 self.config.flow_id
690 ),
691 })?;
692
693 let expr = self
694 .state
695 .write()
696 .unwrap()
697 .dirty_time_windows
698 .gen_filter_exprs(
699 &col_name,
700 Some(expire_lower_bound),
701 window_size,
702 max_window_cnt
703 .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query),
704 self.config.flow_id,
705 Some(self),
706 )?;
707
708 debug!(
709 "Flow id={:?}, Generated filter expr: {:?}",
710 self.config.flow_id,
711 expr.as_ref()
712 .map(
713 |expr| expr_to_sql(&expr.expr).with_context(|_| DatafusionSnafu {
714 context: format!("Failed to generate filter expr from {expr:?}"),
715 })
716 )
717 .transpose()?
718 .map(|s| s.to_string())
719 );
720
721 let Some(expr) = expr else {
722 debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
724 return Ok(None);
725 };
726
727 let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
728 let mut add_auto_column = ColumnMatcherRewriter::new(sink_table_schema.clone());
729
730 let plan =
731 sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;
732 let rewrite = plan
733 .clone()
734 .rewrite(&mut add_filter)
735 .and_then(|p| p.data.rewrite(&mut add_auto_column))
736 .with_context(|_| DatafusionSnafu {
737 context: format!("Failed to rewrite plan:\n {}\n", plan),
738 })?
739 .data;
740 let new_plan = apply_df_optimizer(rewrite, &query_ctx).await?;
742
743 let info = PlanInfo {
744 plan: new_plan.clone(),
745 filter: Some(expr),
746 };
747
748 Ok(Some(info))
749 }
750}
751
752fn create_table_with_expr(
755 plan: &LogicalPlan,
756 sink_table_name: &[String; 3],
757 query_type: &QueryType,
758) -> Result<CreateTableExpr, Error> {
759 let table_def = match query_type {
760 &QueryType::Sql => {
761 if let Some(def) = build_pk_from_aggr(plan)? {
762 def
763 } else {
764 build_by_sql_schema(plan)?
765 }
766 }
767 QueryType::Tql => {
768 if let Some(table_def) = build_pk_from_aggr(plan)? {
770 table_def
771 } else {
772 build_by_tql_schema(plan)?
773 }
774 }
775 };
776 let first_time_stamp = table_def.ts_col;
777 let primary_keys = table_def.pks;
778
779 let mut column_schemas = Vec::new();
780 for field in plan.schema().fields() {
781 let name = field.name();
782 let ty = ConcreteDataType::from_arrow_type(field.data_type());
783 let col_schema = if first_time_stamp == Some(name.clone()) {
784 ColumnSchema::new(name, ty, false).with_time_index(true)
785 } else {
786 ColumnSchema::new(name, ty, true)
787 };
788
789 match query_type {
790 QueryType::Sql => {
791 column_schemas.push(col_schema);
792 }
793 QueryType::Tql => {
794 let is_tag_column = primary_keys.contains(name);
797 let is_val_column = !is_tag_column && first_time_stamp.as_ref() != Some(name);
798 if is_val_column {
799 let col_schema =
800 ColumnSchema::new(name, ConcreteDataType::float64_datatype(), true);
801 column_schemas.push(col_schema);
802 } else if is_tag_column {
803 let col_schema =
804 ColumnSchema::new(name, ConcreteDataType::string_datatype(), true);
805 column_schemas.push(col_schema);
806 } else {
807 column_schemas.push(col_schema);
809 }
810 }
811 }
812 }
813
814 if query_type == &QueryType::Sql {
815 let update_at_schema = ColumnSchema::new(
816 AUTO_CREATED_UPDATE_AT_TS_COL,
817 ConcreteDataType::timestamp_millisecond_datatype(),
818 true,
819 );
820 column_schemas.push(update_at_schema);
821 }
822
823 let time_index = if let Some(time_index) = first_time_stamp {
824 time_index
825 } else {
826 column_schemas.push(
827 ColumnSchema::new(
828 AUTO_CREATED_PLACEHOLDER_TS_COL,
829 ConcreteDataType::timestamp_millisecond_datatype(),
830 false,
831 )
832 .with_time_index(true),
833 );
834 AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
835 };
836
837 let column_defs =
838 column_schemas_to_defs(column_schemas, &primary_keys).context(ConvertColumnSchemaSnafu)?;
839 Ok(CreateTableExpr {
840 catalog_name: sink_table_name[0].clone(),
841 schema_name: sink_table_name[1].clone(),
842 table_name: sink_table_name[2].clone(),
843 desc: "Auto created table by flow engine".to_string(),
844 column_defs,
845 time_index,
846 primary_keys,
847 create_if_not_exists: true,
848 table_options: Default::default(),
849 table_id: None,
850 engine: "mito".to_string(),
851 })
852}
853
854fn build_by_sql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
856 let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
857 if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
858 Some(f.name().clone())
859 } else {
860 None
861 }
862 });
863 Ok(TableDef {
864 ts_col: first_time_stamp,
865 pks: vec![],
866 })
867}
868
869fn build_by_tql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
871 let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
872 if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
873 Some(f.name().clone())
874 } else {
875 None
876 }
877 });
878 let string_columns = plan
879 .schema()
880 .fields()
881 .iter()
882 .filter_map(|f| {
883 if ConcreteDataType::from_arrow_type(f.data_type()).is_string() {
884 Some(f.name().clone())
885 } else {
886 None
887 }
888 })
889 .collect::<Vec<_>>();
890
891 Ok(TableDef {
892 ts_col: first_time_stamp,
893 pks: string_columns,
894 })
895}
896
897struct TableDef {
898 ts_col: Option<String>,
899 pks: Vec<String>,
900}
901
902fn build_pk_from_aggr(plan: &LogicalPlan) -> Result<Option<TableDef>, Error> {
911 let fields = plan.schema().fields();
912 let mut pk_names = FindGroupByFinalName::default();
913
914 plan.visit(&mut pk_names)
915 .with_context(|_| DatafusionSnafu {
916 context: format!("Can't find aggr expr in plan {plan:?}"),
917 })?;
918
919 let Some(pk_final_names) = pk_names.get_group_expr_names() else {
921 return Ok(None);
922 };
923 if pk_final_names.is_empty() {
924 let first_ts_col = fields
925 .iter()
926 .find(|f| ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp())
927 .map(|f| f.name().clone());
928 return Ok(Some(TableDef {
929 ts_col: first_ts_col,
930 pks: vec![],
931 }));
932 }
933
934 let all_pk_cols: Vec<_> = fields
935 .iter()
936 .filter(|f| pk_final_names.contains(f.name()))
937 .map(|f| f.name().clone())
938 .collect();
939 let first_time_stamp = fields
941 .iter()
942 .find(|f| {
943 all_pk_cols.contains(&f.name().clone())
944 && ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()
945 })
946 .map(|f| f.name().clone());
947
948 let all_pk_cols: Vec<_> = all_pk_cols
949 .into_iter()
950 .filter(|col| first_time_stamp != Some(col.to_string()))
951 .collect();
952
953 Ok(Some(TableDef {
954 ts_col: first_time_stamp,
955 pks: all_pk_cols,
956 }))
957}
958
959#[cfg(test)]
960mod test {
961 use api::v1::column_def::try_as_column_schema;
962 use pretty_assertions::assert_eq;
963 use session::context::QueryContext;
964
965 use super::*;
966 use crate::test_utils::create_test_query_engine;
967
968 #[tokio::test]
969 async fn test_gen_create_table_sql() {
970 let query_engine = create_test_query_engine();
971 let ctx = QueryContext::arc();
972 struct TestCase {
973 sql: String,
974 sink_table_name: String,
975 column_schemas: Vec<ColumnSchema>,
976 primary_keys: Vec<String>,
977 time_index: String,
978 }
979
980 let update_at_schema = ColumnSchema::new(
981 AUTO_CREATED_UPDATE_AT_TS_COL,
982 ConcreteDataType::timestamp_millisecond_datatype(),
983 true,
984 );
985
986 let ts_placeholder_schema = ColumnSchema::new(
987 AUTO_CREATED_PLACEHOLDER_TS_COL,
988 ConcreteDataType::timestamp_millisecond_datatype(),
989 false,
990 )
991 .with_time_index(true);
992
993 let testcases = vec![
994 TestCase {
995 sql: "SELECT number, ts FROM numbers_with_ts".to_string(),
996 sink_table_name: "new_table".to_string(),
997 column_schemas: vec![
998 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
999 ColumnSchema::new(
1000 "ts",
1001 ConcreteDataType::timestamp_millisecond_datatype(),
1002 false,
1003 )
1004 .with_time_index(true),
1005 update_at_schema.clone(),
1006 ],
1007 primary_keys: vec![],
1008 time_index: "ts".to_string(),
1009 },
1010 TestCase {
1011 sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(),
1012 sink_table_name: "new_table".to_string(),
1013 column_schemas: vec![
1014 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1015 ColumnSchema::new(
1016 "max(numbers_with_ts.ts)",
1017 ConcreteDataType::timestamp_millisecond_datatype(),
1018 true,
1019 ),
1020 update_at_schema.clone(),
1021 ts_placeholder_schema.clone(),
1022 ],
1023 primary_keys: vec!["number".to_string()],
1024 time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
1025 },
1026 TestCase {
1027 sql: "SELECT max(number), ts FROM numbers_with_ts GROUP BY ts".to_string(),
1028 sink_table_name: "new_table".to_string(),
1029 column_schemas: vec![
1030 ColumnSchema::new(
1031 "max(numbers_with_ts.number)",
1032 ConcreteDataType::uint32_datatype(),
1033 true,
1034 ),
1035 ColumnSchema::new(
1036 "ts",
1037 ConcreteDataType::timestamp_millisecond_datatype(),
1038 false,
1039 )
1040 .with_time_index(true),
1041 update_at_schema.clone(),
1042 ],
1043 primary_keys: vec![],
1044 time_index: "ts".to_string(),
1045 },
1046 TestCase {
1047 sql: "SELECT number, ts FROM numbers_with_ts GROUP BY ts, number".to_string(),
1048 sink_table_name: "new_table".to_string(),
1049 column_schemas: vec![
1050 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1051 ColumnSchema::new(
1052 "ts",
1053 ConcreteDataType::timestamp_millisecond_datatype(),
1054 false,
1055 )
1056 .with_time_index(true),
1057 update_at_schema.clone(),
1058 ],
1059 primary_keys: vec!["number".to_string()],
1060 time_index: "ts".to_string(),
1061 },
1062 ];
1063
1064 for tc in testcases {
1065 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &tc.sql, true)
1066 .await
1067 .unwrap();
1068 let expr = create_table_with_expr(
1069 &plan,
1070 &[
1071 "greptime".to_string(),
1072 "public".to_string(),
1073 tc.sink_table_name.clone(),
1074 ],
1075 &QueryType::Sql,
1076 )
1077 .unwrap();
1078 let column_schemas = expr
1080 .column_defs
1081 .iter()
1082 .map(|c| try_as_column_schema(c).unwrap())
1083 .collect::<Vec<_>>();
1084 assert_eq!(tc.column_schemas, column_schemas, "{:?}", tc.sql);
1085 assert_eq!(tc.primary_keys, expr.primary_keys, "{:?}", tc.sql);
1086 assert_eq!(tc.time_index, expr.time_index, "{:?}", tc.sql);
1087 }
1088 }
1089}