1use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18
19use api::v1::CreateTableExpr;
20use catalog::CatalogManagerRef;
21use common_error::ext::BoxedError;
22use common_query::logical_plan::breakup_insert_plan;
23use common_telemetry::tracing::warn;
24use common_telemetry::{debug, info};
25use common_time::Timestamp;
26use datafusion::datasource::DefaultTableSource;
27use datafusion::sql::unparser::expr_to_sql;
28use datafusion_common::DFSchemaRef;
29use datafusion_common::tree_node::{Transformed, TreeNode};
30use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
31use datatypes::prelude::ConcreteDataType;
32use datatypes::schema::{ColumnSchema, Schema};
33use operator::expr_helper::column_schemas_to_defs;
34use query::QueryEngineRef;
35use query::query_engine::DefaultSerializer;
36use session::context::QueryContextRef;
37use snafu::{OptionExt, ResultExt, ensure};
38use sql::parsers::utils::is_tql;
39use store_api::mito_engine_options::MERGE_MODE_KEY;
40use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
41use table::table::adapter::DfTableProviderAdapter;
42use tokio::sync::oneshot;
43use tokio::sync::oneshot::error::TryRecvError;
44use tokio::time::Instant;
45
46use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
47use crate::batching_mode::BatchingModeOptions;
48use crate::batching_mode::frontend_client::FrontendClient;
49use crate::batching_mode::state::{FilterExprInfo, TaskState};
50use crate::batching_mode::time_window::TimeWindowExpr;
51use crate::batching_mode::utils::{
52 AddFilterRewriter, ColumnMatcherRewriter, FindGroupByFinalName, gen_plan_with_matching_schema,
53 get_table_info_df_schema, sql_to_df_plan,
54};
55use crate::df_optimizer::apply_df_optimizer;
56use crate::error::{
57 ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
58 SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
59};
60use crate::metrics::{
61 METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
62 METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
63 METRIC_FLOW_ROWS,
64};
65use crate::{Error, FlowId};
66
67#[derive(Clone)]
69pub struct TaskConfig {
70 pub flow_id: FlowId,
71 pub query: String,
72 pub output_schema: DFSchemaRef,
74 pub time_window_expr: Option<TimeWindowExpr>,
75 pub expire_after: Option<i64>,
77 pub sink_table_name: [String; 3],
78 pub source_table_names: HashSet<[String; 3]>,
79 pub catalog_manager: CatalogManagerRef,
80 pub query_type: QueryType,
81 pub batch_opts: Arc<BatchingModeOptions>,
82 pub flow_eval_interval: Option<Duration>,
83}
84
85fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
86 let is_tql = is_tql(query_ctx.sql_dialect(), query)
87 .map_err(BoxedError::new)
88 .context(ExternalSnafu)?;
89 Ok(if is_tql {
90 QueryType::Tql
91 } else {
92 QueryType::Sql
93 })
94}
95
96fn is_merge_mode_last_non_null(options: &HashMap<String, String>) -> bool {
97 options
98 .get(MERGE_MODE_KEY)
99 .map(|mode| mode.eq_ignore_ascii_case("last_non_null"))
100 .unwrap_or(false)
101}
102
103#[derive(Debug, Clone, PartialEq, Eq)]
104pub enum QueryType {
105 Tql,
107 Sql,
109}
110
111#[derive(Clone)]
112pub struct BatchingTask {
113 pub config: Arc<TaskConfig>,
114 pub state: Arc<RwLock<TaskState>>,
115}
116
117pub struct TaskArgs<'a> {
119 pub flow_id: FlowId,
120 pub query: &'a str,
121 pub plan: LogicalPlan,
122 pub time_window_expr: Option<TimeWindowExpr>,
123 pub expire_after: Option<i64>,
124 pub sink_table_name: [String; 3],
125 pub source_table_names: Vec<[String; 3]>,
126 pub query_ctx: QueryContextRef,
127 pub catalog_manager: CatalogManagerRef,
128 pub shutdown_rx: oneshot::Receiver<()>,
129 pub batch_opts: Arc<BatchingModeOptions>,
130 pub flow_eval_interval: Option<Duration>,
131}
132
133pub struct PlanInfo {
134 pub plan: LogicalPlan,
135 pub filter: Option<FilterExprInfo>,
136}
137
138impl BatchingTask {
139 #[allow(clippy::too_many_arguments)]
140 pub fn try_new(
141 TaskArgs {
142 flow_id,
143 query,
144 plan,
145 time_window_expr,
146 expire_after,
147 sink_table_name,
148 source_table_names,
149 query_ctx,
150 catalog_manager,
151 shutdown_rx,
152 batch_opts,
153 flow_eval_interval,
154 }: TaskArgs<'_>,
155 ) -> Result<Self, Error> {
156 Ok(Self {
157 config: Arc::new(TaskConfig {
158 flow_id,
159 query: query.to_string(),
160 time_window_expr,
161 expire_after,
162 sink_table_name,
163 source_table_names: source_table_names.into_iter().collect(),
164 catalog_manager,
165 output_schema: plan.schema().clone(),
166 query_type: determine_query_type(query, &query_ctx)?,
167 batch_opts,
168 flow_eval_interval,
169 }),
170 state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
171 })
172 }
173
174 pub fn last_execution_time_millis(&self) -> Option<i64> {
175 self.state.read().unwrap().last_execution_time_millis()
176 }
177
178 pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
182 let now = SystemTime::now();
183 let now = Timestamp::new_second(
184 now.duration_since(UNIX_EPOCH)
185 .expect("Time went backwards")
186 .as_secs() as _,
187 );
188 let lower_bound = self
189 .config
190 .expire_after
191 .map(|e| now.sub_duration(Duration::from_secs(e as _)))
192 .transpose()
193 .map_err(BoxedError::new)
194 .context(ExternalSnafu)?
195 .unwrap_or(Timestamp::new_second(0));
196 debug!(
197 "Flow {} mark range ({:?}, {:?}) as dirty",
198 self.config.flow_id, lower_bound, now
199 );
200 self.state
201 .write()
202 .unwrap()
203 .dirty_time_windows
204 .add_window(lower_bound, Some(now));
205 Ok(())
206 }
207
208 pub async fn check_or_create_sink_table(
210 &self,
211 engine: &QueryEngineRef,
212 frontend_client: &Arc<FrontendClient>,
213 ) -> Result<Option<(u32, Duration)>, Error> {
214 if !self.is_table_exist(&self.config.sink_table_name).await? {
215 let create_table = self.gen_create_table_expr(engine.clone()).await?;
216 info!(
217 "Try creating sink table(if not exists) with expr: {:?}",
218 create_table
219 );
220 self.create_table(frontend_client, create_table).await?;
221 info!(
222 "Sink table {}(if not exists) created",
223 self.config.sink_table_name.join(".")
224 );
225 }
226
227 Ok(None)
228 }
229
230 async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
231 self.config
232 .catalog_manager
233 .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
234 .await
235 .map_err(BoxedError::new)
236 .context(ExternalSnafu)
237 }
238
239 pub async fn gen_exec_once(
240 &self,
241 engine: &QueryEngineRef,
242 frontend_client: &Arc<FrontendClient>,
243 max_window_cnt: Option<usize>,
244 ) -> Result<Option<(u32, Duration)>, Error> {
245 if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? {
246 debug!("Generate new query: {}", new_query.plan);
247 self.execute_logical_plan(frontend_client, &new_query.plan)
248 .await
249 } else {
250 debug!("Generate no query");
251 Ok(None)
252 }
253 }
254
255 pub async fn gen_insert_plan(
256 &self,
257 engine: &QueryEngineRef,
258 max_window_cnt: Option<usize>,
259 ) -> Result<Option<PlanInfo>, Error> {
260 let (table, df_schema) = get_table_info_df_schema(
261 self.config.catalog_manager.clone(),
262 self.config.sink_table_name.clone(),
263 )
264 .await?;
265
266 let table_meta = &table.table_info().meta;
267 let merge_mode_last_non_null =
268 is_merge_mode_last_non_null(&table_meta.options.extra_options);
269 let primary_key_indices = table_meta.primary_key_indices.clone();
270
271 let new_query = self
272 .gen_query_with_time_window(
273 engine.clone(),
274 &table.table_info().meta.schema,
275 &primary_key_indices,
276 merge_mode_last_non_null,
277 max_window_cnt,
278 )
279 .await?;
280
281 let insert_into_info = if let Some(new_query) = new_query {
282 let table_columns = df_schema
285 .columns()
286 .into_iter()
287 .map(|c| c.name)
288 .collect::<BTreeSet<_>>();
289 for column in new_query.plan.schema().columns() {
290 ensure!(
291 table_columns.contains(column.name()),
292 InvalidQuerySnafu {
293 reason: format!(
294 "Column {} not found in sink table with columns {:?}",
295 column, table_columns
296 ),
297 }
298 );
299 }
300
301 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
302 let table_source = Arc::new(DefaultTableSource::new(table_provider));
303
304 let plan = LogicalPlan::Dml(DmlStatement::new(
306 datafusion_common::TableReference::Full {
307 catalog: self.config.sink_table_name[0].clone().into(),
308 schema: self.config.sink_table_name[1].clone().into(),
309 table: self.config.sink_table_name[2].clone().into(),
310 },
311 table_source,
312 WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
313 Arc::new(new_query.plan),
314 ));
315 PlanInfo {
316 plan,
317 filter: new_query.filter,
318 }
319 } else {
320 return Ok(None);
321 };
322 let insert_into = insert_into_info
323 .plan
324 .recompute_schema()
325 .context(DatafusionSnafu {
326 context: "Failed to recompute schema",
327 })?;
328
329 Ok(Some(PlanInfo {
330 plan: insert_into,
331 filter: insert_into_info.filter,
332 }))
333 }
334
335 pub async fn create_table(
336 &self,
337 frontend_client: &Arc<FrontendClient>,
338 expr: CreateTableExpr,
339 ) -> Result<(), Error> {
340 let catalog = &self.config.sink_table_name[0];
341 let schema = &self.config.sink_table_name[1];
342 frontend_client
343 .create(expr.clone(), catalog, schema)
344 .await?;
345 Ok(())
346 }
347
348 pub async fn execute_logical_plan(
349 &self,
350 frontend_client: &Arc<FrontendClient>,
351 plan: &LogicalPlan,
352 ) -> Result<Option<(u32, Duration)>, Error> {
353 let instant = Instant::now();
354 let flow_id = self.config.flow_id;
355
356 debug!(
357 "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
358 self.config.expire_after, &plan
359 );
360
361 let catalog = &self.config.sink_table_name[0];
362 let schema = &self.config.sink_table_name[1];
363
364 let plan = plan
366 .clone()
367 .transform_down_with_subqueries(|p| {
368 if let LogicalPlan::TableScan(mut table_scan) = p {
369 let resolved = table_scan.table_name.resolve(catalog, schema);
370 table_scan.table_name = resolved.into();
371 Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
372 } else {
373 Ok(Transformed::no(p))
374 }
375 })
376 .with_context(|_| DatafusionSnafu {
377 context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
378 })?
379 .data;
380
381 let mut peer_desc = None;
382
383 let res = {
384 let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
385 .with_label_values(&[flow_id.to_string().as_str()])
386 .start_timer();
387
388 let req = if let Some((insert_to, insert_plan)) =
390 breakup_insert_plan(&plan, catalog, schema)
391 {
392 let message = DFLogicalSubstraitConvertor {}
393 .encode(&insert_plan, DefaultSerializer)
394 .context(SubstraitEncodeLogicalPlanSnafu)?;
395 api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
396 query: Some(api::v1::query_request::Query::InsertIntoPlan(
397 api::v1::InsertIntoPlan {
398 table_name: Some(insert_to),
399 logical_plan: message.to_vec(),
400 },
401 )),
402 })
403 } else {
404 let message = DFLogicalSubstraitConvertor {}
405 .encode(&plan, DefaultSerializer)
406 .context(SubstraitEncodeLogicalPlanSnafu)?;
407
408 api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
409 query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
410 })
411 };
412
413 frontend_client
414 .handle(req, catalog, schema, &mut peer_desc)
415 .await
416 };
417
418 let elapsed = instant.elapsed();
419 if let Ok(affected_rows) = &res {
420 debug!(
421 "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
422 elapsed
423 );
424 METRIC_FLOW_ROWS
425 .with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
426 .inc_by(*affected_rows as _);
427 } else if let Err(err) = &res {
428 warn!(
429 "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
430 peer_desc, elapsed, &plan
431 );
432 }
433
434 if elapsed >= self.config.batch_opts.slow_query_threshold {
436 warn!(
437 "Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}",
438 peer_desc, elapsed, &plan
439 );
440 METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
441 .with_label_values(&[
442 flow_id.to_string().as_str(),
443 &peer_desc.unwrap_or_default().to_string(),
444 ])
445 .observe(elapsed.as_secs_f64());
446 }
447
448 self.state
449 .write()
450 .unwrap()
451 .after_query_exec(elapsed, res.is_ok());
452
453 let res = res?;
454
455 Ok(Some((res, elapsed)))
456 }
457
458 pub async fn start_executing_loop(
462 &self,
463 engine: QueryEngineRef,
464 frontend_client: Arc<FrontendClient>,
465 ) {
466 let flow_id_str = self.config.flow_id.to_string();
467 let mut max_window_cnt = None;
468 let mut interval = self
469 .config
470 .flow_eval_interval
471 .map(|d| tokio::time::interval(d));
472 if let Some(tick) = &mut interval {
473 tick.tick().await; }
475 loop {
476 {
479 let mut state = self.state.write().unwrap();
480 match state.shutdown_rx.try_recv() {
481 Ok(()) => break,
482 Err(TryRecvError::Closed) => {
483 warn!(
484 "Unexpected shutdown flow {}, shutdown anyway",
485 self.config.flow_id
486 );
487 break;
488 }
489 Err(TryRecvError::Empty) => (),
490 }
491 }
492 METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
493 .with_label_values(&[&flow_id_str])
494 .inc();
495
496 let min_refresh = self.config.batch_opts.experimental_min_refresh_duration;
497
498 let new_query = match self.gen_insert_plan(&engine, max_window_cnt).await {
499 Ok(new_query) => new_query,
500 Err(err) => {
501 common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
502 tokio::time::sleep(min_refresh).await;
504 continue;
505 }
506 };
507
508 let res = if let Some(new_query) = &new_query {
509 self.execute_logical_plan(&frontend_client, &new_query.plan)
510 .await
511 } else {
512 Ok(None)
513 };
514
515 match res {
516 Ok(Some(_)) => {
518 max_window_cnt = max_window_cnt.map(|cnt| {
520 (cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query)
521 });
522
523 if let Some(eval_interval) = &mut interval {
525 eval_interval.tick().await;
526 } else {
527 let sleep_until = {
530 let state = self.state.write().unwrap();
531
532 let time_window_size = self
533 .config
534 .time_window_expr
535 .as_ref()
536 .and_then(|t| *t.time_window_size());
537
538 state.get_next_start_query_time(
539 self.config.flow_id,
540 &time_window_size,
541 min_refresh,
542 Some(self.config.batch_opts.query_timeout),
543 self.config.batch_opts.experimental_max_filter_num_per_query,
544 )
545 };
546
547 tokio::time::sleep_until(sleep_until).await;
548 };
549 }
550 Ok(None) => {
552 debug!(
553 "Flow id = {:?} found no new data, sleep for {:?} then continue",
554 self.config.flow_id, min_refresh
555 );
556 tokio::time::sleep(min_refresh).await;
557 continue;
558 }
559 Err(err) => {
561 METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
562 .with_label_values(&[&flow_id_str])
563 .inc();
564 match new_query {
565 Some(query) => {
566 common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
567 self.state.write().unwrap().dirty_time_windows.add_windows(
569 query.filter.map(|f| f.time_ranges).unwrap_or_default(),
570 );
571 max_window_cnt = Some(1);
576 }
577 None => {
578 common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
579 }
580 }
581 tokio::time::sleep(min_refresh).await;
583 }
584 }
585 }
586 }
587
588 async fn gen_create_table_expr(
595 &self,
596 engine: QueryEngineRef,
597 ) -> Result<CreateTableExpr, Error> {
598 let query_ctx = self.state.read().unwrap().query_ctx.clone();
599 let plan =
600 sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
601 create_table_with_expr(&plan, &self.config.sink_table_name, &self.config.query_type)
602 }
603
604 async fn gen_query_with_time_window(
606 &self,
607 engine: QueryEngineRef,
608 sink_table_schema: &Arc<Schema>,
609 primary_key_indices: &[usize],
610 allow_partial: bool,
611 max_window_cnt: Option<usize>,
612 ) -> Result<Option<PlanInfo>, Error> {
613 let query_ctx = self.state.read().unwrap().query_ctx.clone();
614 let start = SystemTime::now();
615 let since_the_epoch = start
616 .duration_since(UNIX_EPOCH)
617 .expect("Time went backwards");
618 let low_bound = self
619 .config
620 .expire_after
621 .map(|e| since_the_epoch.as_secs() - e as u64)
622 .unwrap_or(u64::MIN);
623
624 let low_bound = Timestamp::new_second(low_bound as i64);
625
626 let expire_time_window_bound = self
627 .config
628 .time_window_expr
629 .as_ref()
630 .map(|expr| expr.eval(low_bound))
631 .transpose()?;
632
633 let (expire_lower_bound, expire_upper_bound) =
634 match (expire_time_window_bound, &self.config.query_type) {
635 (Some((Some(l), Some(u))), QueryType::Sql) => (l, u),
636 (None, QueryType::Sql) => {
637 debug!(
640 "Flow id = {:?}, no time window, using the same query",
641 self.config.flow_id
642 );
643 let is_dirty = !self.state.read().unwrap().dirty_time_windows.is_empty();
645 self.state.write().unwrap().dirty_time_windows.clean();
646
647 if !is_dirty {
648 debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
650 return Ok(None);
651 }
652
653 let plan = gen_plan_with_matching_schema(
654 &self.config.query,
655 query_ctx,
656 engine,
657 sink_table_schema.clone(),
658 primary_key_indices,
659 allow_partial,
660 )
661 .await?;
662
663 return Ok(Some(PlanInfo { plan, filter: None }));
664 }
665 _ => {
666 self.state.write().unwrap().dirty_time_windows.clean();
668
669 let plan = gen_plan_with_matching_schema(
670 &self.config.query,
671 query_ctx,
672 engine,
673 sink_table_schema.clone(),
674 primary_key_indices,
675 allow_partial,
676 )
677 .await?;
678
679 return Ok(Some(PlanInfo { plan, filter: None }));
680 }
681 };
682
683 debug!(
684 "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
685 self.config.flow_id,
686 expire_lower_bound,
687 expire_upper_bound,
688 self.state.read().unwrap().dirty_time_windows
689 );
690 let window_size = expire_upper_bound
691 .sub(&expire_lower_bound)
692 .with_context(|| UnexpectedSnafu {
693 reason: format!(
694 "Can't get window size from {expire_upper_bound:?} - {expire_lower_bound:?}"
695 ),
696 })?;
697 let col_name = self
698 .config
699 .time_window_expr
700 .as_ref()
701 .map(|expr| expr.column_name.clone())
702 .with_context(|| UnexpectedSnafu {
703 reason: format!(
704 "Flow id={:?}, Failed to get column name from time window expr",
705 self.config.flow_id
706 ),
707 })?;
708
709 let expr = self
710 .state
711 .write()
712 .unwrap()
713 .dirty_time_windows
714 .gen_filter_exprs(
715 &col_name,
716 Some(expire_lower_bound),
717 window_size,
718 max_window_cnt
719 .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query),
720 self.config.flow_id,
721 Some(self),
722 )?;
723
724 debug!(
725 "Flow id={:?}, Generated filter expr: {:?}",
726 self.config.flow_id,
727 expr.as_ref()
728 .map(
729 |expr| expr_to_sql(&expr.expr).with_context(|_| DatafusionSnafu {
730 context: format!("Failed to generate filter expr from {expr:?}"),
731 })
732 )
733 .transpose()?
734 .map(|s| s.to_string())
735 );
736
737 let Some(expr) = expr else {
738 debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
740 return Ok(None);
741 };
742
743 let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
744 let mut add_auto_column = ColumnMatcherRewriter::new(
745 sink_table_schema.clone(),
746 primary_key_indices.to_vec(),
747 allow_partial,
748 );
749
750 let plan =
751 sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;
752 let rewrite = plan
753 .clone()
754 .rewrite(&mut add_filter)
755 .and_then(|p| p.data.rewrite(&mut add_auto_column))
756 .with_context(|_| DatafusionSnafu {
757 context: format!("Failed to rewrite plan:\n {}\n", plan),
758 })?
759 .data;
760 let new_plan = apply_df_optimizer(rewrite, &query_ctx).await?;
762
763 let info = PlanInfo {
764 plan: new_plan.clone(),
765 filter: Some(expr),
766 };
767
768 Ok(Some(info))
769 }
770}
771
772fn create_table_with_expr(
775 plan: &LogicalPlan,
776 sink_table_name: &[String; 3],
777 query_type: &QueryType,
778) -> Result<CreateTableExpr, Error> {
779 let table_def = match query_type {
780 &QueryType::Sql => {
781 if let Some(def) = build_pk_from_aggr(plan)? {
782 def
783 } else {
784 build_by_sql_schema(plan)?
785 }
786 }
787 QueryType::Tql => {
788 if let Some(table_def) = build_pk_from_aggr(plan)? {
790 table_def
791 } else {
792 build_by_tql_schema(plan)?
793 }
794 }
795 };
796 let first_time_stamp = table_def.ts_col;
797 let primary_keys = table_def.pks;
798
799 let mut column_schemas = Vec::new();
800 for field in plan.schema().fields() {
801 let name = field.name();
802 let ty = ConcreteDataType::from_arrow_type(field.data_type());
803 let col_schema = if first_time_stamp == Some(name.clone()) {
804 ColumnSchema::new(name, ty, false).with_time_index(true)
805 } else {
806 ColumnSchema::new(name, ty, true)
807 };
808
809 match query_type {
810 QueryType::Sql => {
811 column_schemas.push(col_schema);
812 }
813 QueryType::Tql => {
814 let is_tag_column = primary_keys.contains(name);
817 let is_val_column = !is_tag_column && first_time_stamp.as_ref() != Some(name);
818 if is_val_column {
819 let col_schema =
820 ColumnSchema::new(name, ConcreteDataType::float64_datatype(), true);
821 column_schemas.push(col_schema);
822 } else if is_tag_column {
823 let col_schema =
824 ColumnSchema::new(name, ConcreteDataType::string_datatype(), true);
825 column_schemas.push(col_schema);
826 } else {
827 column_schemas.push(col_schema);
829 }
830 }
831 }
832 }
833
834 if query_type == &QueryType::Sql {
835 let update_at_schema = ColumnSchema::new(
836 AUTO_CREATED_UPDATE_AT_TS_COL,
837 ConcreteDataType::timestamp_millisecond_datatype(),
838 true,
839 );
840 column_schemas.push(update_at_schema);
841 }
842
843 let time_index = if let Some(time_index) = first_time_stamp {
844 time_index
845 } else {
846 column_schemas.push(
847 ColumnSchema::new(
848 AUTO_CREATED_PLACEHOLDER_TS_COL,
849 ConcreteDataType::timestamp_millisecond_datatype(),
850 false,
851 )
852 .with_time_index(true),
853 );
854 AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
855 };
856
857 let column_defs =
858 column_schemas_to_defs(column_schemas, &primary_keys).context(ConvertColumnSchemaSnafu)?;
859 Ok(CreateTableExpr {
860 catalog_name: sink_table_name[0].clone(),
861 schema_name: sink_table_name[1].clone(),
862 table_name: sink_table_name[2].clone(),
863 desc: "Auto created table by flow engine".to_string(),
864 column_defs,
865 time_index,
866 primary_keys,
867 create_if_not_exists: true,
868 table_options: Default::default(),
869 table_id: None,
870 engine: "mito".to_string(),
871 })
872}
873
874fn build_by_sql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
876 let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
877 if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
878 Some(f.name().clone())
879 } else {
880 None
881 }
882 });
883 Ok(TableDef {
884 ts_col: first_time_stamp,
885 pks: vec![],
886 })
887}
888
889fn build_by_tql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
891 let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
892 if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
893 Some(f.name().clone())
894 } else {
895 None
896 }
897 });
898 let string_columns = plan
899 .schema()
900 .fields()
901 .iter()
902 .filter_map(|f| {
903 if ConcreteDataType::from_arrow_type(f.data_type()).is_string() {
904 Some(f.name().clone())
905 } else {
906 None
907 }
908 })
909 .collect::<Vec<_>>();
910
911 Ok(TableDef {
912 ts_col: first_time_stamp,
913 pks: string_columns,
914 })
915}
916
917struct TableDef {
918 ts_col: Option<String>,
919 pks: Vec<String>,
920}
921
922fn build_pk_from_aggr(plan: &LogicalPlan) -> Result<Option<TableDef>, Error> {
931 let fields = plan.schema().fields();
932 let mut pk_names = FindGroupByFinalName::default();
933
934 plan.visit(&mut pk_names)
935 .with_context(|_| DatafusionSnafu {
936 context: format!("Can't find aggr expr in plan {plan:?}"),
937 })?;
938
939 let Some(pk_final_names) = pk_names.get_group_expr_names() else {
941 return Ok(None);
942 };
943 if pk_final_names.is_empty() {
944 let first_ts_col = fields
945 .iter()
946 .find(|f| ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp())
947 .map(|f| f.name().clone());
948 return Ok(Some(TableDef {
949 ts_col: first_ts_col,
950 pks: vec![],
951 }));
952 }
953
954 let all_pk_cols: Vec<_> = fields
955 .iter()
956 .filter(|f| pk_final_names.contains(f.name()))
957 .map(|f| f.name().clone())
958 .collect();
959 let first_time_stamp = fields
961 .iter()
962 .find(|f| {
963 all_pk_cols.contains(&f.name().clone())
964 && ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()
965 })
966 .map(|f| f.name().clone());
967
968 let all_pk_cols: Vec<_> = all_pk_cols
969 .into_iter()
970 .filter(|col| first_time_stamp.as_ref() != Some(col))
971 .collect();
972
973 Ok(Some(TableDef {
974 ts_col: first_time_stamp,
975 pks: all_pk_cols,
976 }))
977}
978
979#[cfg(test)]
980mod test {
981 use api::v1::column_def::try_as_column_schema;
982 use pretty_assertions::assert_eq;
983 use session::context::QueryContext;
984
985 use super::*;
986 use crate::test_utils::create_test_query_engine;
987
988 #[tokio::test]
989 async fn test_gen_create_table_sql() {
990 let query_engine = create_test_query_engine();
991 let ctx = QueryContext::arc();
992 struct TestCase {
993 sql: String,
994 sink_table_name: String,
995 column_schemas: Vec<ColumnSchema>,
996 primary_keys: Vec<String>,
997 time_index: String,
998 }
999
1000 let update_at_schema = ColumnSchema::new(
1001 AUTO_CREATED_UPDATE_AT_TS_COL,
1002 ConcreteDataType::timestamp_millisecond_datatype(),
1003 true,
1004 );
1005
1006 let ts_placeholder_schema = ColumnSchema::new(
1007 AUTO_CREATED_PLACEHOLDER_TS_COL,
1008 ConcreteDataType::timestamp_millisecond_datatype(),
1009 false,
1010 )
1011 .with_time_index(true);
1012
1013 let testcases = vec![
1014 TestCase {
1015 sql: "SELECT number, ts FROM numbers_with_ts".to_string(),
1016 sink_table_name: "new_table".to_string(),
1017 column_schemas: vec![
1018 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1019 ColumnSchema::new(
1020 "ts",
1021 ConcreteDataType::timestamp_millisecond_datatype(),
1022 false,
1023 )
1024 .with_time_index(true),
1025 update_at_schema.clone(),
1026 ],
1027 primary_keys: vec![],
1028 time_index: "ts".to_string(),
1029 },
1030 TestCase {
1031 sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(),
1032 sink_table_name: "new_table".to_string(),
1033 column_schemas: vec![
1034 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1035 ColumnSchema::new(
1036 "max(numbers_with_ts.ts)",
1037 ConcreteDataType::timestamp_millisecond_datatype(),
1038 true,
1039 ),
1040 update_at_schema.clone(),
1041 ts_placeholder_schema.clone(),
1042 ],
1043 primary_keys: vec!["number".to_string()],
1044 time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
1045 },
1046 TestCase {
1047 sql: "SELECT max(number), ts FROM numbers_with_ts GROUP BY ts".to_string(),
1048 sink_table_name: "new_table".to_string(),
1049 column_schemas: vec![
1050 ColumnSchema::new(
1051 "max(numbers_with_ts.number)",
1052 ConcreteDataType::uint32_datatype(),
1053 true,
1054 ),
1055 ColumnSchema::new(
1056 "ts",
1057 ConcreteDataType::timestamp_millisecond_datatype(),
1058 false,
1059 )
1060 .with_time_index(true),
1061 update_at_schema.clone(),
1062 ],
1063 primary_keys: vec![],
1064 time_index: "ts".to_string(),
1065 },
1066 TestCase {
1067 sql: "SELECT number, ts FROM numbers_with_ts GROUP BY ts, number".to_string(),
1068 sink_table_name: "new_table".to_string(),
1069 column_schemas: vec![
1070 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1071 ColumnSchema::new(
1072 "ts",
1073 ConcreteDataType::timestamp_millisecond_datatype(),
1074 false,
1075 )
1076 .with_time_index(true),
1077 update_at_schema.clone(),
1078 ],
1079 primary_keys: vec!["number".to_string()],
1080 time_index: "ts".to_string(),
1081 },
1082 ];
1083
1084 for tc in testcases {
1085 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &tc.sql, true)
1086 .await
1087 .unwrap();
1088 let expr = create_table_with_expr(
1089 &plan,
1090 &[
1091 "greptime".to_string(),
1092 "public".to_string(),
1093 tc.sink_table_name.clone(),
1094 ],
1095 &QueryType::Sql,
1096 )
1097 .unwrap();
1098 let column_schemas = expr
1100 .column_defs
1101 .iter()
1102 .map(|c| try_as_column_schema(c).unwrap())
1103 .collect::<Vec<_>>();
1104 assert_eq!(tc.column_schemas, column_schemas, "{:?}", tc.sql);
1105 assert_eq!(tc.primary_keys, expr.primary_keys, "{:?}", tc.sql);
1106 assert_eq!(tc.time_index, expr.time_index, "{:?}", tc.sql);
1107 }
1108 }
1109}