1use std::collections::{BTreeSet, HashSet};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18
19use api::v1::CreateTableExpr;
20use arrow_schema::Fields;
21use catalog::CatalogManagerRef;
22use common_error::ext::BoxedError;
23use common_query::logical_plan::breakup_insert_plan;
24use common_telemetry::tracing::warn;
25use common_telemetry::{debug, info};
26use common_time::Timestamp;
27use datafusion::datasource::DefaultTableSource;
28use datafusion::sql::unparser::expr_to_sql;
29use datafusion_common::tree_node::{Transformed, TreeNode};
30use datafusion_common::DFSchemaRef;
31use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
32use datatypes::prelude::ConcreteDataType;
33use datatypes::schema::{ColumnSchema, Schema};
34use operator::expr_helper::column_schemas_to_defs;
35use query::query_engine::DefaultSerializer;
36use query::QueryEngineRef;
37use session::context::QueryContextRef;
38use snafu::{ensure, OptionExt, ResultExt};
39use sql::parser::{ParseOptions, ParserContext};
40use sql::statements::statement::Statement;
41use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
42use table::table::adapter::DfTableProviderAdapter;
43use tokio::sync::oneshot;
44use tokio::sync::oneshot::error::TryRecvError;
45use tokio::time::Instant;
46
47use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
48use crate::batching_mode::frontend_client::FrontendClient;
49use crate::batching_mode::state::{FilterExprInfo, TaskState};
50use crate::batching_mode::time_window::TimeWindowExpr;
51use crate::batching_mode::utils::{
52 get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter,
53 FindGroupByFinalName,
54};
55use crate::batching_mode::BatchingModeOptions;
56use crate::df_optimizer::apply_df_optimizer;
57use crate::error::{
58 ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
59 SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
60};
61use crate::metrics::{
62 METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
63 METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
64 METRIC_FLOW_ROWS,
65};
66use crate::{Error, FlowId};
67
68#[derive(Clone)]
70pub struct TaskConfig {
71 pub flow_id: FlowId,
72 pub query: String,
73 pub output_schema: DFSchemaRef,
75 pub time_window_expr: Option<TimeWindowExpr>,
76 pub expire_after: Option<i64>,
78 sink_table_name: [String; 3],
79 pub source_table_names: HashSet<[String; 3]>,
80 catalog_manager: CatalogManagerRef,
81 query_type: QueryType,
82 batch_opts: Arc<BatchingModeOptions>,
83}
84
85fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
86 let stmts =
87 ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
88 .map_err(BoxedError::new)
89 .context(ExternalSnafu)?;
90
91 ensure!(
92 stmts.len() == 1,
93 InvalidQuerySnafu {
94 reason: format!("Expect only one statement, found {}", stmts.len())
95 }
96 );
97 let stmt = &stmts[0];
98 match stmt {
99 Statement::Tql(_) => Ok(QueryType::Tql),
100 _ => Ok(QueryType::Sql),
101 }
102}
103
104#[derive(Debug, Clone)]
105enum QueryType {
106 Tql,
108 Sql,
110}
111
112#[derive(Clone)]
113pub struct BatchingTask {
114 pub config: Arc<TaskConfig>,
115 pub state: Arc<RwLock<TaskState>>,
116}
117
118pub struct TaskArgs<'a> {
120 pub flow_id: FlowId,
121 pub query: &'a str,
122 pub plan: LogicalPlan,
123 pub time_window_expr: Option<TimeWindowExpr>,
124 pub expire_after: Option<i64>,
125 pub sink_table_name: [String; 3],
126 pub source_table_names: Vec<[String; 3]>,
127 pub query_ctx: QueryContextRef,
128 pub catalog_manager: CatalogManagerRef,
129 pub shutdown_rx: oneshot::Receiver<()>,
130 pub batch_opts: Arc<BatchingModeOptions>,
131}
132
133pub struct PlanInfo {
134 pub plan: LogicalPlan,
135 pub filter: Option<FilterExprInfo>,
136}
137
138impl BatchingTask {
139 #[allow(clippy::too_many_arguments)]
140 pub fn try_new(
141 TaskArgs {
142 flow_id,
143 query,
144 plan,
145 time_window_expr,
146 expire_after,
147 sink_table_name,
148 source_table_names,
149 query_ctx,
150 catalog_manager,
151 shutdown_rx,
152 batch_opts,
153 }: TaskArgs<'_>,
154 ) -> Result<Self, Error> {
155 Ok(Self {
156 config: Arc::new(TaskConfig {
157 flow_id,
158 query: query.to_string(),
159 time_window_expr,
160 expire_after,
161 sink_table_name,
162 source_table_names: source_table_names.into_iter().collect(),
163 catalog_manager,
164 output_schema: plan.schema().clone(),
165 query_type: determine_query_type(query, &query_ctx)?,
166 batch_opts,
167 }),
168 state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
169 })
170 }
171
172 pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
176 let now = SystemTime::now();
177 let now = Timestamp::new_second(
178 now.duration_since(UNIX_EPOCH)
179 .expect("Time went backwards")
180 .as_secs() as _,
181 );
182 let lower_bound = self
183 .config
184 .expire_after
185 .map(|e| now.sub_duration(Duration::from_secs(e as _)))
186 .transpose()
187 .map_err(BoxedError::new)
188 .context(ExternalSnafu)?
189 .unwrap_or(Timestamp::new_second(0));
190 debug!(
191 "Flow {} mark range ({:?}, {:?}) as dirty",
192 self.config.flow_id, lower_bound, now
193 );
194 self.state
195 .write()
196 .unwrap()
197 .dirty_time_windows
198 .add_window(lower_bound, Some(now));
199 Ok(())
200 }
201
202 pub async fn check_or_create_sink_table(
204 &self,
205 engine: &QueryEngineRef,
206 frontend_client: &Arc<FrontendClient>,
207 ) -> Result<Option<(u32, Duration)>, Error> {
208 if !self.is_table_exist(&self.config.sink_table_name).await? {
209 let create_table = self.gen_create_table_expr(engine.clone()).await?;
210 info!(
211 "Try creating sink table(if not exists) with expr: {:?}",
212 create_table
213 );
214 self.create_table(frontend_client, create_table).await?;
215 info!(
216 "Sink table {}(if not exists) created",
217 self.config.sink_table_name.join(".")
218 );
219 }
220
221 Ok(None)
222 }
223
224 async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
225 self.config
226 .catalog_manager
227 .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
228 .await
229 .map_err(BoxedError::new)
230 .context(ExternalSnafu)
231 }
232
233 pub async fn gen_exec_once(
234 &self,
235 engine: &QueryEngineRef,
236 frontend_client: &Arc<FrontendClient>,
237 max_window_cnt: Option<usize>,
238 ) -> Result<Option<(u32, Duration)>, Error> {
239 if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? {
240 debug!("Generate new query: {}", new_query.plan);
241 self.execute_logical_plan(frontend_client, &new_query.plan)
242 .await
243 } else {
244 debug!("Generate no query");
245 Ok(None)
246 }
247 }
248
249 pub async fn gen_insert_plan(
250 &self,
251 engine: &QueryEngineRef,
252 max_window_cnt: Option<usize>,
253 ) -> Result<Option<PlanInfo>, Error> {
254 let (table, df_schema) = get_table_info_df_schema(
255 self.config.catalog_manager.clone(),
256 self.config.sink_table_name.clone(),
257 )
258 .await?;
259
260 let new_query = self
261 .gen_query_with_time_window(
262 engine.clone(),
263 &table.table_info().meta.schema,
264 max_window_cnt,
265 )
266 .await?;
267
268 let insert_into_info = if let Some(new_query) = new_query {
269 let table_columns = df_schema
272 .columns()
273 .into_iter()
274 .map(|c| c.name)
275 .collect::<BTreeSet<_>>();
276 for column in new_query.plan.schema().columns() {
277 ensure!(
278 table_columns.contains(column.name()),
279 InvalidQuerySnafu {
280 reason: format!(
281 "Column {} not found in sink table with columns {:?}",
282 column, table_columns
283 ),
284 }
285 );
286 }
287
288 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
289 let table_source = Arc::new(DefaultTableSource::new(table_provider));
290
291 let plan = LogicalPlan::Dml(DmlStatement::new(
293 datafusion_common::TableReference::Full {
294 catalog: self.config.sink_table_name[0].clone().into(),
295 schema: self.config.sink_table_name[1].clone().into(),
296 table: self.config.sink_table_name[2].clone().into(),
297 },
298 table_source,
299 WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
300 Arc::new(new_query.plan),
301 ));
302 PlanInfo {
303 plan,
304 filter: new_query.filter,
305 }
306 } else {
307 return Ok(None);
308 };
309 let insert_into = insert_into_info
310 .plan
311 .recompute_schema()
312 .context(DatafusionSnafu {
313 context: "Failed to recompute schema",
314 })?;
315
316 Ok(Some(PlanInfo {
317 plan: insert_into,
318 filter: insert_into_info.filter,
319 }))
320 }
321
322 pub async fn create_table(
323 &self,
324 frontend_client: &Arc<FrontendClient>,
325 expr: CreateTableExpr,
326 ) -> Result<(), Error> {
327 let catalog = &self.config.sink_table_name[0];
328 let schema = &self.config.sink_table_name[1];
329 frontend_client
330 .create(expr.clone(), catalog, schema)
331 .await?;
332 Ok(())
333 }
334
335 pub async fn execute_logical_plan(
336 &self,
337 frontend_client: &Arc<FrontendClient>,
338 plan: &LogicalPlan,
339 ) -> Result<Option<(u32, Duration)>, Error> {
340 let instant = Instant::now();
341 let flow_id = self.config.flow_id;
342
343 debug!(
344 "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
345 self.config.expire_after, &plan
346 );
347
348 let catalog = &self.config.sink_table_name[0];
349 let schema = &self.config.sink_table_name[1];
350
351 let plan = plan
353 .clone()
354 .transform_down_with_subqueries(|p| {
355 if let LogicalPlan::TableScan(mut table_scan) = p {
356 let resolved = table_scan.table_name.resolve(catalog, schema);
357 table_scan.table_name = resolved.into();
358 Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
359 } else {
360 Ok(Transformed::no(p))
361 }
362 })
363 .with_context(|_| DatafusionSnafu {
364 context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
365 })?
366 .data;
367
368 let mut peer_desc = None;
369
370 let res = {
371 let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
372 .with_label_values(&[flow_id.to_string().as_str()])
373 .start_timer();
374
375 let req = if let Some((insert_to, insert_plan)) =
377 breakup_insert_plan(&plan, catalog, schema)
378 {
379 let message = DFLogicalSubstraitConvertor {}
380 .encode(&insert_plan, DefaultSerializer)
381 .context(SubstraitEncodeLogicalPlanSnafu)?;
382 api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
383 query: Some(api::v1::query_request::Query::InsertIntoPlan(
384 api::v1::InsertIntoPlan {
385 table_name: Some(insert_to),
386 logical_plan: message.to_vec(),
387 },
388 )),
389 })
390 } else {
391 let message = DFLogicalSubstraitConvertor {}
392 .encode(&plan, DefaultSerializer)
393 .context(SubstraitEncodeLogicalPlanSnafu)?;
394
395 api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
396 query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
397 })
398 };
399
400 frontend_client
401 .handle(req, catalog, schema, &mut peer_desc)
402 .await
403 };
404
405 let elapsed = instant.elapsed();
406 if let Ok(affected_rows) = &res {
407 debug!(
408 "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
409 elapsed
410 );
411 METRIC_FLOW_ROWS
412 .with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
413 .inc_by(*affected_rows as _);
414 } else if let Err(err) = &res {
415 warn!(
416 "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
417 peer_desc, elapsed, &plan
418 );
419 }
420
421 if elapsed >= self.config.batch_opts.slow_query_threshold {
423 warn!(
424 "Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}",
425 peer_desc, elapsed, &plan
426 );
427 METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
428 .with_label_values(&[
429 flow_id.to_string().as_str(),
430 &peer_desc.unwrap_or_default().to_string(),
431 ])
432 .observe(elapsed.as_secs_f64());
433 }
434
435 self.state
436 .write()
437 .unwrap()
438 .after_query_exec(elapsed, res.is_ok());
439
440 let res = res?;
441
442 Ok(Some((res, elapsed)))
443 }
444
445 pub async fn start_executing_loop(
449 &self,
450 engine: QueryEngineRef,
451 frontend_client: Arc<FrontendClient>,
452 ) {
453 let flow_id_str = self.config.flow_id.to_string();
454 let mut max_window_cnt = None;
455 loop {
456 {
459 let mut state = self.state.write().unwrap();
460 match state.shutdown_rx.try_recv() {
461 Ok(()) => break,
462 Err(TryRecvError::Closed) => {
463 warn!(
464 "Unexpected shutdown flow {}, shutdown anyway",
465 self.config.flow_id
466 );
467 break;
468 }
469 Err(TryRecvError::Empty) => (),
470 }
471 }
472 METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
473 .with_label_values(&[&flow_id_str])
474 .inc();
475
476 let min_refresh = self.config.batch_opts.experimental_min_refresh_duration;
477
478 let new_query = match self.gen_insert_plan(&engine, max_window_cnt).await {
479 Ok(new_query) => new_query,
480 Err(err) => {
481 common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
482 tokio::time::sleep(min_refresh).await;
484 continue;
485 }
486 };
487
488 let res = if let Some(new_query) = &new_query {
489 self.execute_logical_plan(&frontend_client, &new_query.plan)
490 .await
491 } else {
492 Ok(None)
493 };
494
495 match res {
496 Ok(Some(_)) => {
498 max_window_cnt = max_window_cnt.map(|cnt| {
500 (cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query)
501 });
502 let sleep_until = {
503 let state = self.state.write().unwrap();
504
505 let time_window_size = self
506 .config
507 .time_window_expr
508 .as_ref()
509 .and_then(|t| *t.time_window_size());
510
511 state.get_next_start_query_time(
512 self.config.flow_id,
513 &time_window_size,
514 min_refresh,
515 Some(self.config.batch_opts.query_timeout),
516 self.config.batch_opts.experimental_max_filter_num_per_query,
517 )
518 };
519 tokio::time::sleep_until(sleep_until).await;
520 }
521 Ok(None) => {
523 debug!(
524 "Flow id = {:?} found no new data, sleep for {:?} then continue",
525 self.config.flow_id, min_refresh
526 );
527 tokio::time::sleep(min_refresh).await;
528 continue;
529 }
530 Err(err) => {
532 METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
533 .with_label_values(&[&flow_id_str])
534 .inc();
535 match new_query {
536 Some(query) => {
537 common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
538 self.state.write().unwrap().dirty_time_windows.add_windows(
540 query.filter.map(|f| f.time_ranges).unwrap_or_default(),
541 );
542 max_window_cnt = Some(1);
547 }
548 None => {
549 common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
550 }
551 }
552 tokio::time::sleep(min_refresh).await;
554 }
555 }
556 }
557 }
558
559 async fn gen_create_table_expr(
566 &self,
567 engine: QueryEngineRef,
568 ) -> Result<CreateTableExpr, Error> {
569 let query_ctx = self.state.read().unwrap().query_ctx.clone();
570 let plan =
571 sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
572 create_table_with_expr(&plan, &self.config.sink_table_name)
573 }
574
575 async fn gen_query_with_time_window(
577 &self,
578 engine: QueryEngineRef,
579 sink_table_schema: &Arc<Schema>,
580 max_window_cnt: Option<usize>,
581 ) -> Result<Option<PlanInfo>, Error> {
582 let query_ctx = self.state.read().unwrap().query_ctx.clone();
583 let start = SystemTime::now();
584 let since_the_epoch = start
585 .duration_since(UNIX_EPOCH)
586 .expect("Time went backwards");
587 let low_bound = self
588 .config
589 .expire_after
590 .map(|e| since_the_epoch.as_secs() - e as u64)
591 .unwrap_or(u64::MIN);
592
593 let low_bound = Timestamp::new_second(low_bound as i64);
594
595 let expire_time_window_bound = self
596 .config
597 .time_window_expr
598 .as_ref()
599 .map(|expr| expr.eval(low_bound))
600 .transpose()?;
601
602 let (Some((Some(l), Some(u))), QueryType::Sql) =
603 (expire_time_window_bound, &self.config.query_type)
604 else {
605 debug!(
608 "Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id
609 );
610 self.state.write().unwrap().dirty_time_windows.clean();
612
613 let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
615
616 let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false)
617 .await?;
618
619 let plan = plan
620 .clone()
621 .rewrite(&mut add_auto_column)
622 .with_context(|_| DatafusionSnafu {
623 context: format!("Failed to rewrite plan:\n {}\n", plan),
624 })?
625 .data;
626
627 return Ok(Some(PlanInfo { plan, filter: None }));
629 };
630
631 debug!(
632 "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
633 self.config.flow_id, l, u, self.state.read().unwrap().dirty_time_windows
634 );
635 let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
636 reason: format!("Can't get window size from {u:?} - {l:?}"),
637 })?;
638 let col_name = self
639 .config
640 .time_window_expr
641 .as_ref()
642 .map(|expr| expr.column_name.clone())
643 .with_context(|| UnexpectedSnafu {
644 reason: format!(
645 "Flow id={:?}, Failed to get column name from time window expr",
646 self.config.flow_id
647 ),
648 })?;
649
650 let expr = self
651 .state
652 .write()
653 .unwrap()
654 .dirty_time_windows
655 .gen_filter_exprs(
656 &col_name,
657 Some(l),
658 window_size,
659 max_window_cnt
660 .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query),
661 self.config.flow_id,
662 Some(self),
663 )?;
664
665 debug!(
666 "Flow id={:?}, Generated filter expr: {:?}",
667 self.config.flow_id,
668 expr.as_ref()
669 .map(
670 |expr| expr_to_sql(&expr.expr).with_context(|_| DatafusionSnafu {
671 context: format!("Failed to generate filter expr from {expr:?}"),
672 })
673 )
674 .transpose()?
675 .map(|s| s.to_string())
676 );
677
678 let Some(expr) = expr else {
679 debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
681 return Ok(None);
682 };
683
684 let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
685 let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
686
687 let plan =
688 sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;
689 let rewrite = plan
690 .clone()
691 .rewrite(&mut add_filter)
692 .and_then(|p| p.data.rewrite(&mut add_auto_column))
693 .with_context(|_| DatafusionSnafu {
694 context: format!("Failed to rewrite plan:\n {}\n", plan),
695 })?
696 .data;
697 let new_plan = apply_df_optimizer(rewrite).await?;
699
700 let info = PlanInfo {
701 plan: new_plan.clone(),
702 filter: Some(expr),
703 };
704
705 Ok(Some(info))
706 }
707}
708
709fn create_table_with_expr(
712 plan: &LogicalPlan,
713 sink_table_name: &[String; 3],
714) -> Result<CreateTableExpr, Error> {
715 let fields = plan.schema().fields();
716 let (first_time_stamp, primary_keys) = build_primary_key_constraint(plan, fields)?;
717
718 let mut column_schemas = Vec::new();
719 for field in fields {
720 let name = field.name();
721 let ty = ConcreteDataType::from_arrow_type(field.data_type());
722 let col_schema = if first_time_stamp == Some(name.clone()) {
723 ColumnSchema::new(name, ty, false).with_time_index(true)
724 } else {
725 ColumnSchema::new(name, ty, true)
726 };
727 column_schemas.push(col_schema);
728 }
729
730 let update_at_schema = ColumnSchema::new(
731 AUTO_CREATED_UPDATE_AT_TS_COL,
732 ConcreteDataType::timestamp_millisecond_datatype(),
733 true,
734 );
735 column_schemas.push(update_at_schema);
736
737 let time_index = if let Some(time_index) = first_time_stamp {
738 time_index
739 } else {
740 column_schemas.push(
741 ColumnSchema::new(
742 AUTO_CREATED_PLACEHOLDER_TS_COL,
743 ConcreteDataType::timestamp_millisecond_datatype(),
744 false,
745 )
746 .with_time_index(true),
747 );
748 AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
749 };
750
751 let column_defs =
752 column_schemas_to_defs(column_schemas, &primary_keys).context(ConvertColumnSchemaSnafu)?;
753 Ok(CreateTableExpr {
754 catalog_name: sink_table_name[0].clone(),
755 schema_name: sink_table_name[1].clone(),
756 table_name: sink_table_name[2].clone(),
757 desc: "Auto created table by flow engine".to_string(),
758 column_defs,
759 time_index,
760 primary_keys,
761 create_if_not_exists: true,
762 table_options: Default::default(),
763 table_id: None,
764 engine: "mito".to_string(),
765 })
766}
767
768fn build_primary_key_constraint(
775 plan: &LogicalPlan,
776 schema: &Fields,
777) -> Result<(Option<String>, Vec<String>), Error> {
778 let mut pk_names = FindGroupByFinalName::default();
779
780 plan.visit(&mut pk_names)
781 .with_context(|_| DatafusionSnafu {
782 context: format!("Can't find aggr expr in plan {plan:?}"),
783 })?;
784
785 let pk_final_names = pk_names.get_group_expr_names().unwrap_or_default();
787 if pk_final_names.is_empty() {
788 return Ok((None, Vec::new()));
789 }
790
791 let all_pk_cols: Vec<_> = schema
792 .iter()
793 .filter(|f| pk_final_names.contains(f.name()))
794 .map(|f| f.name().clone())
795 .collect();
796 let first_time_stamp = schema
798 .iter()
799 .find(|f| {
800 all_pk_cols.contains(&f.name().clone())
801 && ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()
802 })
803 .map(|f| f.name().clone());
804
805 let all_pk_cols: Vec<_> = all_pk_cols
806 .into_iter()
807 .filter(|col| first_time_stamp != Some(col.to_string()))
808 .collect();
809
810 Ok((first_time_stamp, all_pk_cols))
811}
812
813#[cfg(test)]
814mod test {
815 use api::v1::column_def::try_as_column_schema;
816 use pretty_assertions::assert_eq;
817 use session::context::QueryContext;
818
819 use super::*;
820 use crate::test_utils::create_test_query_engine;
821
822 #[tokio::test]
823 async fn test_gen_create_table_sql() {
824 let query_engine = create_test_query_engine();
825 let ctx = QueryContext::arc();
826 struct TestCase {
827 sql: String,
828 sink_table_name: String,
829 column_schemas: Vec<ColumnSchema>,
830 primary_keys: Vec<String>,
831 time_index: String,
832 }
833
834 let update_at_schema = ColumnSchema::new(
835 AUTO_CREATED_UPDATE_AT_TS_COL,
836 ConcreteDataType::timestamp_millisecond_datatype(),
837 true,
838 );
839
840 let ts_placeholder_schema = ColumnSchema::new(
841 AUTO_CREATED_PLACEHOLDER_TS_COL,
842 ConcreteDataType::timestamp_millisecond_datatype(),
843 false,
844 )
845 .with_time_index(true);
846
847 let testcases = vec![
848 TestCase {
849 sql: "SELECT number, ts FROM numbers_with_ts".to_string(),
850 sink_table_name: "new_table".to_string(),
851 column_schemas: vec![
852 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
853 ColumnSchema::new(
854 "ts",
855 ConcreteDataType::timestamp_millisecond_datatype(),
856 true,
857 ),
858 update_at_schema.clone(),
859 ts_placeholder_schema.clone(),
860 ],
861 primary_keys: vec![],
862 time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
863 },
864 TestCase {
865 sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(),
866 sink_table_name: "new_table".to_string(),
867 column_schemas: vec![
868 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
869 ColumnSchema::new(
870 "max(numbers_with_ts.ts)",
871 ConcreteDataType::timestamp_millisecond_datatype(),
872 true,
873 ),
874 update_at_schema.clone(),
875 ts_placeholder_schema.clone(),
876 ],
877 primary_keys: vec!["number".to_string()],
878 time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
879 },
880 TestCase {
881 sql: "SELECT max(number), ts FROM numbers_with_ts GROUP BY ts".to_string(),
882 sink_table_name: "new_table".to_string(),
883 column_schemas: vec![
884 ColumnSchema::new(
885 "max(numbers_with_ts.number)",
886 ConcreteDataType::uint32_datatype(),
887 true,
888 ),
889 ColumnSchema::new(
890 "ts",
891 ConcreteDataType::timestamp_millisecond_datatype(),
892 false,
893 )
894 .with_time_index(true),
895 update_at_schema.clone(),
896 ],
897 primary_keys: vec![],
898 time_index: "ts".to_string(),
899 },
900 TestCase {
901 sql: "SELECT number, ts FROM numbers_with_ts GROUP BY ts, number".to_string(),
902 sink_table_name: "new_table".to_string(),
903 column_schemas: vec![
904 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
905 ColumnSchema::new(
906 "ts",
907 ConcreteDataType::timestamp_millisecond_datatype(),
908 false,
909 )
910 .with_time_index(true),
911 update_at_schema.clone(),
912 ],
913 primary_keys: vec!["number".to_string()],
914 time_index: "ts".to_string(),
915 },
916 ];
917
918 for tc in testcases {
919 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &tc.sql, true)
920 .await
921 .unwrap();
922 let expr = create_table_with_expr(
923 &plan,
924 &[
925 "greptime".to_string(),
926 "public".to_string(),
927 tc.sink_table_name.clone(),
928 ],
929 )
930 .unwrap();
931 let column_schemas = expr
933 .column_defs
934 .iter()
935 .map(|c| try_as_column_schema(c).unwrap())
936 .collect::<Vec<_>>();
937 assert_eq!(tc.column_schemas, column_schemas);
938 assert_eq!(tc.primary_keys, expr.primary_keys);
939 assert_eq!(tc.time_index, expr.time_index);
940 }
941 }
942}