1use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18
19use api::v1::CreateTableExpr;
20use catalog::CatalogManagerRef;
21use common_error::ext::BoxedError;
22use common_query::logical_plan::breakup_insert_plan;
23use common_telemetry::tracing::warn;
24use common_telemetry::{debug, info};
25use common_time::Timestamp;
26use datafusion::datasource::DefaultTableSource;
27use datafusion::sql::unparser::expr_to_sql;
28use datafusion_common::DFSchemaRef;
29use datafusion_common::tree_node::{Transformed, TreeNode};
30use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
31use datatypes::prelude::ConcreteDataType;
32use datatypes::schema::{ColumnSchema, Schema};
33use operator::expr_helper::column_schemas_to_defs;
34use query::QueryEngineRef;
35use query::query_engine::DefaultSerializer;
36use session::context::QueryContextRef;
37use snafu::{OptionExt, ResultExt, ensure};
38use sql::parser::{ParseOptions, ParserContext};
39use sql::statements::statement::Statement;
40use store_api::mito_engine_options::MERGE_MODE_KEY;
41use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
42use table::table::adapter::DfTableProviderAdapter;
43use tokio::sync::oneshot;
44use tokio::sync::oneshot::error::TryRecvError;
45use tokio::time::Instant;
46
47use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
48use crate::batching_mode::BatchingModeOptions;
49use crate::batching_mode::frontend_client::FrontendClient;
50use crate::batching_mode::state::{FilterExprInfo, TaskState};
51use crate::batching_mode::time_window::TimeWindowExpr;
52use crate::batching_mode::utils::{
53 AddFilterRewriter, ColumnMatcherRewriter, FindGroupByFinalName, gen_plan_with_matching_schema,
54 get_table_info_df_schema, sql_to_df_plan,
55};
56use crate::df_optimizer::apply_df_optimizer;
57use crate::error::{
58 ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
59 SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
60};
61use crate::metrics::{
62 METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
63 METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
64 METRIC_FLOW_ROWS,
65};
66use crate::{Error, FlowId};
67
68#[derive(Clone)]
70pub struct TaskConfig {
71 pub flow_id: FlowId,
72 pub query: String,
73 pub output_schema: DFSchemaRef,
75 pub time_window_expr: Option<TimeWindowExpr>,
76 pub expire_after: Option<i64>,
78 pub sink_table_name: [String; 3],
79 pub source_table_names: HashSet<[String; 3]>,
80 pub catalog_manager: CatalogManagerRef,
81 pub query_type: QueryType,
82 pub batch_opts: Arc<BatchingModeOptions>,
83 pub flow_eval_interval: Option<Duration>,
84}
85
86fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
87 let stmts =
88 ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
89 .map_err(BoxedError::new)
90 .context(ExternalSnafu)?;
91
92 ensure!(
93 stmts.len() == 1,
94 InvalidQuerySnafu {
95 reason: format!("Expect only one statement, found {}", stmts.len())
96 }
97 );
98 let stmt = &stmts[0];
99 match stmt {
100 Statement::Tql(_) => Ok(QueryType::Tql),
101 _ => Ok(QueryType::Sql),
102 }
103}
104
105fn is_merge_mode_last_non_null(options: &HashMap<String, String>) -> bool {
106 options
107 .get(MERGE_MODE_KEY)
108 .map(|mode| mode.eq_ignore_ascii_case("last_non_null"))
109 .unwrap_or(false)
110}
111
112#[derive(Debug, Clone, PartialEq, Eq)]
113pub enum QueryType {
114 Tql,
116 Sql,
118}
119
120#[derive(Clone)]
121pub struct BatchingTask {
122 pub config: Arc<TaskConfig>,
123 pub state: Arc<RwLock<TaskState>>,
124}
125
126pub struct TaskArgs<'a> {
128 pub flow_id: FlowId,
129 pub query: &'a str,
130 pub plan: LogicalPlan,
131 pub time_window_expr: Option<TimeWindowExpr>,
132 pub expire_after: Option<i64>,
133 pub sink_table_name: [String; 3],
134 pub source_table_names: Vec<[String; 3]>,
135 pub query_ctx: QueryContextRef,
136 pub catalog_manager: CatalogManagerRef,
137 pub shutdown_rx: oneshot::Receiver<()>,
138 pub batch_opts: Arc<BatchingModeOptions>,
139 pub flow_eval_interval: Option<Duration>,
140}
141
142pub struct PlanInfo {
143 pub plan: LogicalPlan,
144 pub filter: Option<FilterExprInfo>,
145}
146
147impl BatchingTask {
148 #[allow(clippy::too_many_arguments)]
149 pub fn try_new(
150 TaskArgs {
151 flow_id,
152 query,
153 plan,
154 time_window_expr,
155 expire_after,
156 sink_table_name,
157 source_table_names,
158 query_ctx,
159 catalog_manager,
160 shutdown_rx,
161 batch_opts,
162 flow_eval_interval,
163 }: TaskArgs<'_>,
164 ) -> Result<Self, Error> {
165 Ok(Self {
166 config: Arc::new(TaskConfig {
167 flow_id,
168 query: query.to_string(),
169 time_window_expr,
170 expire_after,
171 sink_table_name,
172 source_table_names: source_table_names.into_iter().collect(),
173 catalog_manager,
174 output_schema: plan.schema().clone(),
175 query_type: determine_query_type(query, &query_ctx)?,
176 batch_opts,
177 flow_eval_interval,
178 }),
179 state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
180 })
181 }
182
183 pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
187 let now = SystemTime::now();
188 let now = Timestamp::new_second(
189 now.duration_since(UNIX_EPOCH)
190 .expect("Time went backwards")
191 .as_secs() as _,
192 );
193 let lower_bound = self
194 .config
195 .expire_after
196 .map(|e| now.sub_duration(Duration::from_secs(e as _)))
197 .transpose()
198 .map_err(BoxedError::new)
199 .context(ExternalSnafu)?
200 .unwrap_or(Timestamp::new_second(0));
201 debug!(
202 "Flow {} mark range ({:?}, {:?}) as dirty",
203 self.config.flow_id, lower_bound, now
204 );
205 self.state
206 .write()
207 .unwrap()
208 .dirty_time_windows
209 .add_window(lower_bound, Some(now));
210 Ok(())
211 }
212
213 pub async fn check_or_create_sink_table(
215 &self,
216 engine: &QueryEngineRef,
217 frontend_client: &Arc<FrontendClient>,
218 ) -> Result<Option<(u32, Duration)>, Error> {
219 if !self.is_table_exist(&self.config.sink_table_name).await? {
220 let create_table = self.gen_create_table_expr(engine.clone()).await?;
221 info!(
222 "Try creating sink table(if not exists) with expr: {:?}",
223 create_table
224 );
225 self.create_table(frontend_client, create_table).await?;
226 info!(
227 "Sink table {}(if not exists) created",
228 self.config.sink_table_name.join(".")
229 );
230 }
231
232 Ok(None)
233 }
234
235 async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
236 self.config
237 .catalog_manager
238 .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
239 .await
240 .map_err(BoxedError::new)
241 .context(ExternalSnafu)
242 }
243
244 pub async fn gen_exec_once(
245 &self,
246 engine: &QueryEngineRef,
247 frontend_client: &Arc<FrontendClient>,
248 max_window_cnt: Option<usize>,
249 ) -> Result<Option<(u32, Duration)>, Error> {
250 if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? {
251 debug!("Generate new query: {}", new_query.plan);
252 self.execute_logical_plan(frontend_client, &new_query.plan)
253 .await
254 } else {
255 debug!("Generate no query");
256 Ok(None)
257 }
258 }
259
260 pub async fn gen_insert_plan(
261 &self,
262 engine: &QueryEngineRef,
263 max_window_cnt: Option<usize>,
264 ) -> Result<Option<PlanInfo>, Error> {
265 let (table, df_schema) = get_table_info_df_schema(
266 self.config.catalog_manager.clone(),
267 self.config.sink_table_name.clone(),
268 )
269 .await?;
270
271 let table_meta = &table.table_info().meta;
272 let merge_mode_last_non_null =
273 is_merge_mode_last_non_null(&table_meta.options.extra_options);
274 let primary_key_indices = table_meta.primary_key_indices.clone();
275
276 let new_query = self
277 .gen_query_with_time_window(
278 engine.clone(),
279 &table.table_info().meta.schema,
280 &primary_key_indices,
281 merge_mode_last_non_null,
282 max_window_cnt,
283 )
284 .await?;
285
286 let insert_into_info = if let Some(new_query) = new_query {
287 let table_columns = df_schema
290 .columns()
291 .into_iter()
292 .map(|c| c.name)
293 .collect::<BTreeSet<_>>();
294 for column in new_query.plan.schema().columns() {
295 ensure!(
296 table_columns.contains(column.name()),
297 InvalidQuerySnafu {
298 reason: format!(
299 "Column {} not found in sink table with columns {:?}",
300 column, table_columns
301 ),
302 }
303 );
304 }
305
306 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
307 let table_source = Arc::new(DefaultTableSource::new(table_provider));
308
309 let plan = LogicalPlan::Dml(DmlStatement::new(
311 datafusion_common::TableReference::Full {
312 catalog: self.config.sink_table_name[0].clone().into(),
313 schema: self.config.sink_table_name[1].clone().into(),
314 table: self.config.sink_table_name[2].clone().into(),
315 },
316 table_source,
317 WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
318 Arc::new(new_query.plan),
319 ));
320 PlanInfo {
321 plan,
322 filter: new_query.filter,
323 }
324 } else {
325 return Ok(None);
326 };
327 let insert_into = insert_into_info
328 .plan
329 .recompute_schema()
330 .context(DatafusionSnafu {
331 context: "Failed to recompute schema",
332 })?;
333
334 Ok(Some(PlanInfo {
335 plan: insert_into,
336 filter: insert_into_info.filter,
337 }))
338 }
339
340 pub async fn create_table(
341 &self,
342 frontend_client: &Arc<FrontendClient>,
343 expr: CreateTableExpr,
344 ) -> Result<(), Error> {
345 let catalog = &self.config.sink_table_name[0];
346 let schema = &self.config.sink_table_name[1];
347 frontend_client
348 .create(expr.clone(), catalog, schema)
349 .await?;
350 Ok(())
351 }
352
353 pub async fn execute_logical_plan(
354 &self,
355 frontend_client: &Arc<FrontendClient>,
356 plan: &LogicalPlan,
357 ) -> Result<Option<(u32, Duration)>, Error> {
358 let instant = Instant::now();
359 let flow_id = self.config.flow_id;
360
361 debug!(
362 "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
363 self.config.expire_after, &plan
364 );
365
366 let catalog = &self.config.sink_table_name[0];
367 let schema = &self.config.sink_table_name[1];
368
369 let plan = plan
371 .clone()
372 .transform_down_with_subqueries(|p| {
373 if let LogicalPlan::TableScan(mut table_scan) = p {
374 let resolved = table_scan.table_name.resolve(catalog, schema);
375 table_scan.table_name = resolved.into();
376 Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
377 } else {
378 Ok(Transformed::no(p))
379 }
380 })
381 .with_context(|_| DatafusionSnafu {
382 context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
383 })?
384 .data;
385
386 let mut peer_desc = None;
387
388 let res = {
389 let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
390 .with_label_values(&[flow_id.to_string().as_str()])
391 .start_timer();
392
393 let req = if let Some((insert_to, insert_plan)) =
395 breakup_insert_plan(&plan, catalog, schema)
396 {
397 let message = DFLogicalSubstraitConvertor {}
398 .encode(&insert_plan, DefaultSerializer)
399 .context(SubstraitEncodeLogicalPlanSnafu)?;
400 api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
401 query: Some(api::v1::query_request::Query::InsertIntoPlan(
402 api::v1::InsertIntoPlan {
403 table_name: Some(insert_to),
404 logical_plan: message.to_vec(),
405 },
406 )),
407 })
408 } else {
409 let message = DFLogicalSubstraitConvertor {}
410 .encode(&plan, DefaultSerializer)
411 .context(SubstraitEncodeLogicalPlanSnafu)?;
412
413 api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
414 query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
415 })
416 };
417
418 frontend_client
419 .handle(req, catalog, schema, &mut peer_desc)
420 .await
421 };
422
423 let elapsed = instant.elapsed();
424 if let Ok(affected_rows) = &res {
425 debug!(
426 "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
427 elapsed
428 );
429 METRIC_FLOW_ROWS
430 .with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
431 .inc_by(*affected_rows as _);
432 } else if let Err(err) = &res {
433 warn!(
434 "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
435 peer_desc, elapsed, &plan
436 );
437 }
438
439 if elapsed >= self.config.batch_opts.slow_query_threshold {
441 warn!(
442 "Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}",
443 peer_desc, elapsed, &plan
444 );
445 METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
446 .with_label_values(&[
447 flow_id.to_string().as_str(),
448 &peer_desc.unwrap_or_default().to_string(),
449 ])
450 .observe(elapsed.as_secs_f64());
451 }
452
453 self.state
454 .write()
455 .unwrap()
456 .after_query_exec(elapsed, res.is_ok());
457
458 let res = res?;
459
460 Ok(Some((res, elapsed)))
461 }
462
463 pub async fn start_executing_loop(
467 &self,
468 engine: QueryEngineRef,
469 frontend_client: Arc<FrontendClient>,
470 ) {
471 let flow_id_str = self.config.flow_id.to_string();
472 let mut max_window_cnt = None;
473 let mut interval = self
474 .config
475 .flow_eval_interval
476 .map(|d| tokio::time::interval(d));
477 if let Some(tick) = &mut interval {
478 tick.tick().await; }
480 loop {
481 {
484 let mut state = self.state.write().unwrap();
485 match state.shutdown_rx.try_recv() {
486 Ok(()) => break,
487 Err(TryRecvError::Closed) => {
488 warn!(
489 "Unexpected shutdown flow {}, shutdown anyway",
490 self.config.flow_id
491 );
492 break;
493 }
494 Err(TryRecvError::Empty) => (),
495 }
496 }
497 METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
498 .with_label_values(&[&flow_id_str])
499 .inc();
500
501 let min_refresh = self.config.batch_opts.experimental_min_refresh_duration;
502
503 let new_query = match self.gen_insert_plan(&engine, max_window_cnt).await {
504 Ok(new_query) => new_query,
505 Err(err) => {
506 common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
507 tokio::time::sleep(min_refresh).await;
509 continue;
510 }
511 };
512
513 let res = if let Some(new_query) = &new_query {
514 self.execute_logical_plan(&frontend_client, &new_query.plan)
515 .await
516 } else {
517 Ok(None)
518 };
519
520 match res {
521 Ok(Some(_)) => {
523 max_window_cnt = max_window_cnt.map(|cnt| {
525 (cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query)
526 });
527
528 if let Some(eval_interval) = &mut interval {
530 eval_interval.tick().await;
531 } else {
532 let sleep_until = {
535 let state = self.state.write().unwrap();
536
537 let time_window_size = self
538 .config
539 .time_window_expr
540 .as_ref()
541 .and_then(|t| *t.time_window_size());
542
543 state.get_next_start_query_time(
544 self.config.flow_id,
545 &time_window_size,
546 min_refresh,
547 Some(self.config.batch_opts.query_timeout),
548 self.config.batch_opts.experimental_max_filter_num_per_query,
549 )
550 };
551
552 tokio::time::sleep_until(sleep_until).await;
553 };
554 }
555 Ok(None) => {
557 debug!(
558 "Flow id = {:?} found no new data, sleep for {:?} then continue",
559 self.config.flow_id, min_refresh
560 );
561 tokio::time::sleep(min_refresh).await;
562 continue;
563 }
564 Err(err) => {
566 METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
567 .with_label_values(&[&flow_id_str])
568 .inc();
569 match new_query {
570 Some(query) => {
571 common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
572 self.state.write().unwrap().dirty_time_windows.add_windows(
574 query.filter.map(|f| f.time_ranges).unwrap_or_default(),
575 );
576 max_window_cnt = Some(1);
581 }
582 None => {
583 common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
584 }
585 }
586 tokio::time::sleep(min_refresh).await;
588 }
589 }
590 }
591 }
592
593 async fn gen_create_table_expr(
600 &self,
601 engine: QueryEngineRef,
602 ) -> Result<CreateTableExpr, Error> {
603 let query_ctx = self.state.read().unwrap().query_ctx.clone();
604 let plan =
605 sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
606 create_table_with_expr(&plan, &self.config.sink_table_name, &self.config.query_type)
607 }
608
609 async fn gen_query_with_time_window(
611 &self,
612 engine: QueryEngineRef,
613 sink_table_schema: &Arc<Schema>,
614 primary_key_indices: &[usize],
615 allow_partial: bool,
616 max_window_cnt: Option<usize>,
617 ) -> Result<Option<PlanInfo>, Error> {
618 let query_ctx = self.state.read().unwrap().query_ctx.clone();
619 let start = SystemTime::now();
620 let since_the_epoch = start
621 .duration_since(UNIX_EPOCH)
622 .expect("Time went backwards");
623 let low_bound = self
624 .config
625 .expire_after
626 .map(|e| since_the_epoch.as_secs() - e as u64)
627 .unwrap_or(u64::MIN);
628
629 let low_bound = Timestamp::new_second(low_bound as i64);
630
631 let expire_time_window_bound = self
632 .config
633 .time_window_expr
634 .as_ref()
635 .map(|expr| expr.eval(low_bound))
636 .transpose()?;
637
638 let (expire_lower_bound, expire_upper_bound) =
639 match (expire_time_window_bound, &self.config.query_type) {
640 (Some((Some(l), Some(u))), QueryType::Sql) => (l, u),
641 (None, QueryType::Sql) => {
642 debug!(
645 "Flow id = {:?}, no time window, using the same query",
646 self.config.flow_id
647 );
648 let is_dirty = !self.state.read().unwrap().dirty_time_windows.is_empty();
650 self.state.write().unwrap().dirty_time_windows.clean();
651
652 if !is_dirty {
653 debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
655 return Ok(None);
656 }
657
658 let plan = gen_plan_with_matching_schema(
659 &self.config.query,
660 query_ctx,
661 engine,
662 sink_table_schema.clone(),
663 primary_key_indices,
664 allow_partial,
665 )
666 .await?;
667
668 return Ok(Some(PlanInfo { plan, filter: None }));
669 }
670 _ => {
671 self.state.write().unwrap().dirty_time_windows.clean();
673
674 let plan = gen_plan_with_matching_schema(
675 &self.config.query,
676 query_ctx,
677 engine,
678 sink_table_schema.clone(),
679 primary_key_indices,
680 allow_partial,
681 )
682 .await?;
683
684 return Ok(Some(PlanInfo { plan, filter: None }));
685 }
686 };
687
688 debug!(
689 "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
690 self.config.flow_id,
691 expire_lower_bound,
692 expire_upper_bound,
693 self.state.read().unwrap().dirty_time_windows
694 );
695 let window_size = expire_upper_bound
696 .sub(&expire_lower_bound)
697 .with_context(|| UnexpectedSnafu {
698 reason: format!(
699 "Can't get window size from {expire_upper_bound:?} - {expire_lower_bound:?}"
700 ),
701 })?;
702 let col_name = self
703 .config
704 .time_window_expr
705 .as_ref()
706 .map(|expr| expr.column_name.clone())
707 .with_context(|| UnexpectedSnafu {
708 reason: format!(
709 "Flow id={:?}, Failed to get column name from time window expr",
710 self.config.flow_id
711 ),
712 })?;
713
714 let expr = self
715 .state
716 .write()
717 .unwrap()
718 .dirty_time_windows
719 .gen_filter_exprs(
720 &col_name,
721 Some(expire_lower_bound),
722 window_size,
723 max_window_cnt
724 .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query),
725 self.config.flow_id,
726 Some(self),
727 )?;
728
729 debug!(
730 "Flow id={:?}, Generated filter expr: {:?}",
731 self.config.flow_id,
732 expr.as_ref()
733 .map(
734 |expr| expr_to_sql(&expr.expr).with_context(|_| DatafusionSnafu {
735 context: format!("Failed to generate filter expr from {expr:?}"),
736 })
737 )
738 .transpose()?
739 .map(|s| s.to_string())
740 );
741
742 let Some(expr) = expr else {
743 debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
745 return Ok(None);
746 };
747
748 let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
749 let mut add_auto_column = ColumnMatcherRewriter::new(
750 sink_table_schema.clone(),
751 primary_key_indices.to_vec(),
752 allow_partial,
753 );
754
755 let plan =
756 sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;
757 let rewrite = plan
758 .clone()
759 .rewrite(&mut add_filter)
760 .and_then(|p| p.data.rewrite(&mut add_auto_column))
761 .with_context(|_| DatafusionSnafu {
762 context: format!("Failed to rewrite plan:\n {}\n", plan),
763 })?
764 .data;
765 let new_plan = apply_df_optimizer(rewrite, &query_ctx).await?;
767
768 let info = PlanInfo {
769 plan: new_plan.clone(),
770 filter: Some(expr),
771 };
772
773 Ok(Some(info))
774 }
775}
776
777fn create_table_with_expr(
780 plan: &LogicalPlan,
781 sink_table_name: &[String; 3],
782 query_type: &QueryType,
783) -> Result<CreateTableExpr, Error> {
784 let table_def = match query_type {
785 &QueryType::Sql => {
786 if let Some(def) = build_pk_from_aggr(plan)? {
787 def
788 } else {
789 build_by_sql_schema(plan)?
790 }
791 }
792 QueryType::Tql => {
793 if let Some(table_def) = build_pk_from_aggr(plan)? {
795 table_def
796 } else {
797 build_by_tql_schema(plan)?
798 }
799 }
800 };
801 let first_time_stamp = table_def.ts_col;
802 let primary_keys = table_def.pks;
803
804 let mut column_schemas = Vec::new();
805 for field in plan.schema().fields() {
806 let name = field.name();
807 let ty = ConcreteDataType::from_arrow_type(field.data_type());
808 let col_schema = if first_time_stamp == Some(name.clone()) {
809 ColumnSchema::new(name, ty, false).with_time_index(true)
810 } else {
811 ColumnSchema::new(name, ty, true)
812 };
813
814 match query_type {
815 QueryType::Sql => {
816 column_schemas.push(col_schema);
817 }
818 QueryType::Tql => {
819 let is_tag_column = primary_keys.contains(name);
822 let is_val_column = !is_tag_column && first_time_stamp.as_ref() != Some(name);
823 if is_val_column {
824 let col_schema =
825 ColumnSchema::new(name, ConcreteDataType::float64_datatype(), true);
826 column_schemas.push(col_schema);
827 } else if is_tag_column {
828 let col_schema =
829 ColumnSchema::new(name, ConcreteDataType::string_datatype(), true);
830 column_schemas.push(col_schema);
831 } else {
832 column_schemas.push(col_schema);
834 }
835 }
836 }
837 }
838
839 if query_type == &QueryType::Sql {
840 let update_at_schema = ColumnSchema::new(
841 AUTO_CREATED_UPDATE_AT_TS_COL,
842 ConcreteDataType::timestamp_millisecond_datatype(),
843 true,
844 );
845 column_schemas.push(update_at_schema);
846 }
847
848 let time_index = if let Some(time_index) = first_time_stamp {
849 time_index
850 } else {
851 column_schemas.push(
852 ColumnSchema::new(
853 AUTO_CREATED_PLACEHOLDER_TS_COL,
854 ConcreteDataType::timestamp_millisecond_datatype(),
855 false,
856 )
857 .with_time_index(true),
858 );
859 AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
860 };
861
862 let column_defs =
863 column_schemas_to_defs(column_schemas, &primary_keys).context(ConvertColumnSchemaSnafu)?;
864 Ok(CreateTableExpr {
865 catalog_name: sink_table_name[0].clone(),
866 schema_name: sink_table_name[1].clone(),
867 table_name: sink_table_name[2].clone(),
868 desc: "Auto created table by flow engine".to_string(),
869 column_defs,
870 time_index,
871 primary_keys,
872 create_if_not_exists: true,
873 table_options: Default::default(),
874 table_id: None,
875 engine: "mito".to_string(),
876 })
877}
878
879fn build_by_sql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
881 let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
882 if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
883 Some(f.name().clone())
884 } else {
885 None
886 }
887 });
888 Ok(TableDef {
889 ts_col: first_time_stamp,
890 pks: vec![],
891 })
892}
893
894fn build_by_tql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
896 let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
897 if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
898 Some(f.name().clone())
899 } else {
900 None
901 }
902 });
903 let string_columns = plan
904 .schema()
905 .fields()
906 .iter()
907 .filter_map(|f| {
908 if ConcreteDataType::from_arrow_type(f.data_type()).is_string() {
909 Some(f.name().clone())
910 } else {
911 None
912 }
913 })
914 .collect::<Vec<_>>();
915
916 Ok(TableDef {
917 ts_col: first_time_stamp,
918 pks: string_columns,
919 })
920}
921
922struct TableDef {
923 ts_col: Option<String>,
924 pks: Vec<String>,
925}
926
927fn build_pk_from_aggr(plan: &LogicalPlan) -> Result<Option<TableDef>, Error> {
936 let fields = plan.schema().fields();
937 let mut pk_names = FindGroupByFinalName::default();
938
939 plan.visit(&mut pk_names)
940 .with_context(|_| DatafusionSnafu {
941 context: format!("Can't find aggr expr in plan {plan:?}"),
942 })?;
943
944 let Some(pk_final_names) = pk_names.get_group_expr_names() else {
946 return Ok(None);
947 };
948 if pk_final_names.is_empty() {
949 let first_ts_col = fields
950 .iter()
951 .find(|f| ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp())
952 .map(|f| f.name().clone());
953 return Ok(Some(TableDef {
954 ts_col: first_ts_col,
955 pks: vec![],
956 }));
957 }
958
959 let all_pk_cols: Vec<_> = fields
960 .iter()
961 .filter(|f| pk_final_names.contains(f.name()))
962 .map(|f| f.name().clone())
963 .collect();
964 let first_time_stamp = fields
966 .iter()
967 .find(|f| {
968 all_pk_cols.contains(&f.name().clone())
969 && ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()
970 })
971 .map(|f| f.name().clone());
972
973 let all_pk_cols: Vec<_> = all_pk_cols
974 .into_iter()
975 .filter(|col| first_time_stamp.as_ref() != Some(col))
976 .collect();
977
978 Ok(Some(TableDef {
979 ts_col: first_time_stamp,
980 pks: all_pk_cols,
981 }))
982}
983
984#[cfg(test)]
985mod test {
986 use api::v1::column_def::try_as_column_schema;
987 use pretty_assertions::assert_eq;
988 use session::context::QueryContext;
989
990 use super::*;
991 use crate::test_utils::create_test_query_engine;
992
993 #[tokio::test]
994 async fn test_gen_create_table_sql() {
995 let query_engine = create_test_query_engine();
996 let ctx = QueryContext::arc();
997 struct TestCase {
998 sql: String,
999 sink_table_name: String,
1000 column_schemas: Vec<ColumnSchema>,
1001 primary_keys: Vec<String>,
1002 time_index: String,
1003 }
1004
1005 let update_at_schema = ColumnSchema::new(
1006 AUTO_CREATED_UPDATE_AT_TS_COL,
1007 ConcreteDataType::timestamp_millisecond_datatype(),
1008 true,
1009 );
1010
1011 let ts_placeholder_schema = ColumnSchema::new(
1012 AUTO_CREATED_PLACEHOLDER_TS_COL,
1013 ConcreteDataType::timestamp_millisecond_datatype(),
1014 false,
1015 )
1016 .with_time_index(true);
1017
1018 let testcases = vec![
1019 TestCase {
1020 sql: "SELECT number, ts FROM numbers_with_ts".to_string(),
1021 sink_table_name: "new_table".to_string(),
1022 column_schemas: vec![
1023 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1024 ColumnSchema::new(
1025 "ts",
1026 ConcreteDataType::timestamp_millisecond_datatype(),
1027 false,
1028 )
1029 .with_time_index(true),
1030 update_at_schema.clone(),
1031 ],
1032 primary_keys: vec![],
1033 time_index: "ts".to_string(),
1034 },
1035 TestCase {
1036 sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(),
1037 sink_table_name: "new_table".to_string(),
1038 column_schemas: vec![
1039 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1040 ColumnSchema::new(
1041 "max(numbers_with_ts.ts)",
1042 ConcreteDataType::timestamp_millisecond_datatype(),
1043 true,
1044 ),
1045 update_at_schema.clone(),
1046 ts_placeholder_schema.clone(),
1047 ],
1048 primary_keys: vec!["number".to_string()],
1049 time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
1050 },
1051 TestCase {
1052 sql: "SELECT max(number), ts FROM numbers_with_ts GROUP BY ts".to_string(),
1053 sink_table_name: "new_table".to_string(),
1054 column_schemas: vec![
1055 ColumnSchema::new(
1056 "max(numbers_with_ts.number)",
1057 ConcreteDataType::uint32_datatype(),
1058 true,
1059 ),
1060 ColumnSchema::new(
1061 "ts",
1062 ConcreteDataType::timestamp_millisecond_datatype(),
1063 false,
1064 )
1065 .with_time_index(true),
1066 update_at_schema.clone(),
1067 ],
1068 primary_keys: vec![],
1069 time_index: "ts".to_string(),
1070 },
1071 TestCase {
1072 sql: "SELECT number, ts FROM numbers_with_ts GROUP BY ts, number".to_string(),
1073 sink_table_name: "new_table".to_string(),
1074 column_schemas: vec![
1075 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1076 ColumnSchema::new(
1077 "ts",
1078 ConcreteDataType::timestamp_millisecond_datatype(),
1079 false,
1080 )
1081 .with_time_index(true),
1082 update_at_schema.clone(),
1083 ],
1084 primary_keys: vec!["number".to_string()],
1085 time_index: "ts".to_string(),
1086 },
1087 ];
1088
1089 for tc in testcases {
1090 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &tc.sql, true)
1091 .await
1092 .unwrap();
1093 let expr = create_table_with_expr(
1094 &plan,
1095 &[
1096 "greptime".to_string(),
1097 "public".to_string(),
1098 tc.sink_table_name.clone(),
1099 ],
1100 &QueryType::Sql,
1101 )
1102 .unwrap();
1103 let column_schemas = expr
1105 .column_defs
1106 .iter()
1107 .map(|c| try_as_column_schema(c).unwrap())
1108 .collect::<Vec<_>>();
1109 assert_eq!(tc.column_schemas, column_schemas, "{:?}", tc.sql);
1110 assert_eq!(tc.primary_keys, expr.primary_keys, "{:?}", tc.sql);
1111 assert_eq!(tc.time_index, expr.time_index, "{:?}", tc.sql);
1112 }
1113 }
1114}