1use std::collections::{BTreeSet, HashSet};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18
19use api::v1::CreateTableExpr;
20use arrow_schema::Fields;
21use catalog::CatalogManagerRef;
22use common_error::ext::BoxedError;
23use common_query::logical_plan::breakup_insert_plan;
24use common_telemetry::tracing::warn;
25use common_telemetry::{debug, info};
26use common_time::Timestamp;
27use datafusion::optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
28use datafusion::optimizer::AnalyzerRule;
29use datafusion::sql::unparser::expr_to_sql;
30use datafusion_common::tree_node::{Transformed, TreeNode};
31use datafusion_common::DFSchemaRef;
32use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
33use datatypes::prelude::ConcreteDataType;
34use datatypes::schema::{ColumnSchema, Schema};
35use operator::expr_helper::column_schemas_to_defs;
36use query::query_engine::DefaultSerializer;
37use query::QueryEngineRef;
38use session::context::QueryContextRef;
39use snafu::{ensure, OptionExt, ResultExt};
40use sql::parser::{ParseOptions, ParserContext};
41use sql::statements::statement::Statement;
42use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
43use tokio::sync::oneshot;
44use tokio::sync::oneshot::error::TryRecvError;
45use tokio::time::Instant;
46
47use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
48use crate::batching_mode::frontend_client::FrontendClient;
49use crate::batching_mode::state::{DirtyTimeWindows, TaskState};
50use crate::batching_mode::time_window::TimeWindowExpr;
51use crate::batching_mode::utils::{
52 get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter,
53 FindGroupByFinalName,
54};
55use crate::batching_mode::{
56 DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, MIN_REFRESH_DURATION, SLOW_QUERY_THRESHOLD,
57};
58use crate::df_optimizer::apply_df_optimizer;
59use crate::error::{
60 ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
61 SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
62};
63use crate::metrics::{
64 METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
65};
66use crate::{Error, FlowId};
67
68#[derive(Clone)]
70pub struct TaskConfig {
71 pub flow_id: FlowId,
72 pub query: String,
73 pub output_schema: DFSchemaRef,
75 pub time_window_expr: Option<TimeWindowExpr>,
76 pub expire_after: Option<i64>,
78 sink_table_name: [String; 3],
79 pub source_table_names: HashSet<[String; 3]>,
80 catalog_manager: CatalogManagerRef,
81 query_type: QueryType,
82}
83
84fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
85 let stmts =
86 ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
87 .map_err(BoxedError::new)
88 .context(ExternalSnafu)?;
89
90 ensure!(
91 stmts.len() == 1,
92 InvalidQuerySnafu {
93 reason: format!("Expect only one statement, found {}", stmts.len())
94 }
95 );
96 let stmt = &stmts[0];
97 match stmt {
98 Statement::Tql(_) => Ok(QueryType::Tql),
99 _ => Ok(QueryType::Sql),
100 }
101}
102
103#[derive(Debug, Clone)]
104enum QueryType {
105 Tql,
107 Sql,
109}
110
111#[derive(Clone)]
112pub struct BatchingTask {
113 pub config: Arc<TaskConfig>,
114 pub state: Arc<RwLock<TaskState>>,
115}
116
117impl BatchingTask {
118 #[allow(clippy::too_many_arguments)]
119 pub fn try_new(
120 flow_id: FlowId,
121 query: &str,
122 plan: LogicalPlan,
123 time_window_expr: Option<TimeWindowExpr>,
124 expire_after: Option<i64>,
125 sink_table_name: [String; 3],
126 source_table_names: Vec<[String; 3]>,
127 query_ctx: QueryContextRef,
128 catalog_manager: CatalogManagerRef,
129 shutdown_rx: oneshot::Receiver<()>,
130 ) -> Result<Self, Error> {
131 Ok(Self {
132 config: Arc::new(TaskConfig {
133 flow_id,
134 query: query.to_string(),
135 time_window_expr,
136 expire_after,
137 sink_table_name,
138 source_table_names: source_table_names.into_iter().collect(),
139 catalog_manager,
140 output_schema: plan.schema().clone(),
141 query_type: determine_query_type(query, &query_ctx)?,
142 }),
143 state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
144 })
145 }
146
147 pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
151 let now = SystemTime::now();
152 let now = Timestamp::new_second(
153 now.duration_since(UNIX_EPOCH)
154 .expect("Time went backwards")
155 .as_secs() as _,
156 );
157 let lower_bound = self
158 .config
159 .expire_after
160 .map(|e| now.sub_duration(Duration::from_secs(e as _)))
161 .transpose()
162 .map_err(BoxedError::new)
163 .context(ExternalSnafu)?
164 .unwrap_or(Timestamp::new_second(0));
165 debug!(
166 "Flow {} mark range ({:?}, {:?}) as dirty",
167 self.config.flow_id, lower_bound, now
168 );
169 self.state
170 .write()
171 .unwrap()
172 .dirty_time_windows
173 .add_window(lower_bound, Some(now));
174 Ok(())
175 }
176
177 pub async fn check_or_create_sink_table(
179 &self,
180 engine: &QueryEngineRef,
181 frontend_client: &Arc<FrontendClient>,
182 ) -> Result<Option<(u32, Duration)>, Error> {
183 if !self.is_table_exist(&self.config.sink_table_name).await? {
184 let create_table = self.gen_create_table_expr(engine.clone()).await?;
185 info!(
186 "Try creating sink table(if not exists) with expr: {:?}",
187 create_table
188 );
189 self.create_table(frontend_client, create_table).await?;
190 info!(
191 "Sink table {}(if not exists) created",
192 self.config.sink_table_name.join(".")
193 );
194 }
195
196 Ok(None)
197 }
198
199 async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
200 self.config
201 .catalog_manager
202 .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
203 .await
204 .map_err(BoxedError::new)
205 .context(ExternalSnafu)
206 }
207
208 pub async fn gen_exec_once(
209 &self,
210 engine: &QueryEngineRef,
211 frontend_client: &Arc<FrontendClient>,
212 ) -> Result<Option<(u32, Duration)>, Error> {
213 if let Some(new_query) = self.gen_insert_plan(engine).await? {
214 debug!("Generate new query: {}", new_query);
215 self.execute_logical_plan(frontend_client, &new_query).await
216 } else {
217 debug!("Generate no query");
218 Ok(None)
219 }
220 }
221
222 pub async fn gen_insert_plan(
223 &self,
224 engine: &QueryEngineRef,
225 ) -> Result<Option<LogicalPlan>, Error> {
226 let (table, df_schema) = get_table_info_df_schema(
227 self.config.catalog_manager.clone(),
228 self.config.sink_table_name.clone(),
229 )
230 .await?;
231
232 let new_query = self
233 .gen_query_with_time_window(engine.clone(), &table.meta.schema)
234 .await?;
235
236 let insert_into = if let Some((new_query, _column_cnt)) = new_query {
237 let table_columns = df_schema
240 .columns()
241 .into_iter()
242 .map(|c| c.name)
243 .collect::<BTreeSet<_>>();
244 for column in new_query.schema().columns() {
245 ensure!(
246 table_columns.contains(column.name()),
247 InvalidQuerySnafu {
248 reason: format!(
249 "Column {} not found in sink table with columns {:?}",
250 column, table_columns
251 ),
252 }
253 );
254 }
255 LogicalPlan::Dml(DmlStatement::new(
257 datafusion_common::TableReference::Full {
258 catalog: self.config.sink_table_name[0].clone().into(),
259 schema: self.config.sink_table_name[1].clone().into(),
260 table: self.config.sink_table_name[2].clone().into(),
261 },
262 df_schema,
263 WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
264 Arc::new(new_query),
265 ))
266 } else {
267 return Ok(None);
268 };
269 let insert_into = insert_into.recompute_schema().context(DatafusionSnafu {
270 context: "Failed to recompute schema",
271 })?;
272 Ok(Some(insert_into))
273 }
274
275 pub async fn create_table(
276 &self,
277 frontend_client: &Arc<FrontendClient>,
278 expr: CreateTableExpr,
279 ) -> Result<(), Error> {
280 let catalog = &self.config.sink_table_name[0];
281 let schema = &self.config.sink_table_name[1];
282 frontend_client
283 .create(expr.clone(), catalog, schema)
284 .await?;
285 Ok(())
286 }
287
288 pub async fn execute_logical_plan(
289 &self,
290 frontend_client: &Arc<FrontendClient>,
291 plan: &LogicalPlan,
292 ) -> Result<Option<(u32, Duration)>, Error> {
293 let instant = Instant::now();
294 let flow_id = self.config.flow_id;
295
296 debug!(
297 "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
298 self.config.expire_after, &plan
299 );
300
301 let catalog = &self.config.sink_table_name[0];
302 let schema = &self.config.sink_table_name[1];
303
304 let fixed_plan = plan
306 .clone()
307 .transform_down_with_subqueries(|p| {
308 if let LogicalPlan::TableScan(mut table_scan) = p {
309 let resolved = table_scan.table_name.resolve(catalog, schema);
310 table_scan.table_name = resolved.into();
311 Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
312 } else {
313 Ok(Transformed::no(p))
314 }
315 })
316 .with_context(|_| DatafusionSnafu {
317 context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
318 })?
319 .data;
320
321 let expanded_plan = CountWildcardRule::new()
322 .analyze(fixed_plan.clone(), &Default::default())
323 .with_context(|_| DatafusionSnafu {
324 context: format!(
325 "Failed to expand wildcard in logical plan, plan={:?}",
326 fixed_plan
327 ),
328 })?;
329
330 let plan = expanded_plan;
331 let mut peer_desc = None;
332
333 let res = {
334 let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
335 .with_label_values(&[flow_id.to_string().as_str()])
336 .start_timer();
337
338 let req = if let Some((insert_to, insert_plan)) =
340 breakup_insert_plan(&plan, catalog, schema)
341 {
342 let message = DFLogicalSubstraitConvertor {}
343 .encode(&insert_plan, DefaultSerializer)
344 .context(SubstraitEncodeLogicalPlanSnafu)?;
345 api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
346 query: Some(api::v1::query_request::Query::InsertIntoPlan(
347 api::v1::InsertIntoPlan {
348 table_name: Some(insert_to),
349 logical_plan: message.to_vec(),
350 },
351 )),
352 })
353 } else {
354 let message = DFLogicalSubstraitConvertor {}
355 .encode(&plan, DefaultSerializer)
356 .context(SubstraitEncodeLogicalPlanSnafu)?;
357
358 api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
359 query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
360 })
361 };
362
363 frontend_client
364 .handle(req, catalog, schema, &mut peer_desc)
365 .await
366 };
367
368 let elapsed = instant.elapsed();
369 if let Ok(affected_rows) = &res {
370 debug!(
371 "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
372 elapsed
373 );
374 } else if let Err(err) = &res {
375 warn!(
376 "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
377 peer_desc, elapsed, &plan
378 );
379 }
380
381 if elapsed >= SLOW_QUERY_THRESHOLD {
383 warn!(
384 "Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}",
385 peer_desc, elapsed, &plan
386 );
387 METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
388 .with_label_values(&[
389 flow_id.to_string().as_str(),
390 &peer_desc.unwrap_or_default().to_string(),
391 ])
392 .observe(elapsed.as_secs_f64());
393 }
394
395 self.state
396 .write()
397 .unwrap()
398 .after_query_exec(elapsed, res.is_ok());
399
400 let res = res?;
401
402 Ok(Some((res, elapsed)))
403 }
404
405 pub async fn start_executing_loop(
409 &self,
410 engine: QueryEngineRef,
411 frontend_client: Arc<FrontendClient>,
412 ) {
413 loop {
414 {
417 let mut state = self.state.write().unwrap();
418 match state.shutdown_rx.try_recv() {
419 Ok(()) => break,
420 Err(TryRecvError::Closed) => {
421 warn!(
422 "Unexpected shutdown flow {}, shutdown anyway",
423 self.config.flow_id
424 );
425 break;
426 }
427 Err(TryRecvError::Empty) => (),
428 }
429 }
430
431 let new_query = match self.gen_insert_plan(&engine).await {
432 Ok(new_query) => new_query,
433 Err(err) => {
434 common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
435 tokio::time::sleep(MIN_REFRESH_DURATION).await;
437 continue;
438 }
439 };
440
441 let res = if let Some(new_query) = &new_query {
442 self.execute_logical_plan(&frontend_client, new_query).await
443 } else {
444 Ok(None)
445 };
446
447 match res {
448 Ok(Some(_)) => {
450 let sleep_until = {
451 let state = self.state.write().unwrap();
452
453 state.get_next_start_query_time(
454 self.config.flow_id,
455 &self
456 .config
457 .time_window_expr
458 .as_ref()
459 .and_then(|t| *t.time_window_size()),
460 Some(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT),
461 )
462 };
463 tokio::time::sleep_until(sleep_until).await;
464 }
465 Ok(None) => {
467 debug!(
468 "Flow id = {:?} found no new data, sleep for {:?} then continue",
469 self.config.flow_id, MIN_REFRESH_DURATION
470 );
471 tokio::time::sleep(MIN_REFRESH_DURATION).await;
472 continue;
473 }
474 Err(err) => {
476 match new_query {
477 Some(query) => {
478 common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
479 }
480 None => {
481 common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
482 }
483 }
484 tokio::time::sleep(MIN_REFRESH_DURATION).await;
486 }
487 }
488 }
489 }
490
491 async fn gen_create_table_expr(
498 &self,
499 engine: QueryEngineRef,
500 ) -> Result<CreateTableExpr, Error> {
501 let query_ctx = self.state.read().unwrap().query_ctx.clone();
502 let plan =
503 sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
504 create_table_with_expr(&plan, &self.config.sink_table_name)
505 }
506
507 async fn gen_query_with_time_window(
509 &self,
510 engine: QueryEngineRef,
511 sink_table_schema: &Arc<Schema>,
512 ) -> Result<Option<(LogicalPlan, usize)>, Error> {
513 let query_ctx = self.state.read().unwrap().query_ctx.clone();
514 let start = SystemTime::now();
515 let since_the_epoch = start
516 .duration_since(UNIX_EPOCH)
517 .expect("Time went backwards");
518 let low_bound = self
519 .config
520 .expire_after
521 .map(|e| since_the_epoch.as_secs() - e as u64)
522 .unwrap_or(u64::MIN);
523
524 let low_bound = Timestamp::new_second(low_bound as i64);
525 let schema_len = self.config.output_schema.fields().len();
526
527 let expire_time_window_bound = self
528 .config
529 .time_window_expr
530 .as_ref()
531 .map(|expr| expr.eval(low_bound))
532 .transpose()?;
533
534 let (Some((Some(l), Some(u))), QueryType::Sql) =
535 (expire_time_window_bound, &self.config.query_type)
536 else {
537 debug!(
540 "Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id
541 );
542 self.state.write().unwrap().dirty_time_windows.clean();
544
545 let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
547
548 let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false)
549 .await?;
550
551 let plan = plan
552 .clone()
553 .rewrite(&mut add_auto_column)
554 .with_context(|_| DatafusionSnafu {
555 context: format!("Failed to rewrite plan:\n {}\n", plan),
556 })?
557 .data;
558 let schema_len = plan.schema().fields().len();
559
560 return Ok(Some((plan, schema_len)));
562 };
563
564 debug!(
565 "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?}",
566 self.config.flow_id, l, u
567 );
568 let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
569 reason: format!("Can't get window size from {u:?} - {l:?}"),
570 })?;
571 let col_name = self
572 .config
573 .time_window_expr
574 .as_ref()
575 .map(|expr| expr.column_name.clone())
576 .with_context(|| UnexpectedSnafu {
577 reason: format!(
578 "Flow id={:?}, Failed to get column name from time window expr",
579 self.config.flow_id
580 ),
581 })?;
582
583 let expr = self
584 .state
585 .write()
586 .unwrap()
587 .dirty_time_windows
588 .gen_filter_exprs(
589 &col_name,
590 Some(l),
591 window_size,
592 DirtyTimeWindows::MAX_FILTER_NUM,
593 self.config.flow_id,
594 Some(self),
595 )?;
596
597 debug!(
598 "Flow id={:?}, Generated filter expr: {:?}",
599 self.config.flow_id,
600 expr.as_ref()
601 .map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu {
602 context: format!("Failed to generate filter expr from {expr:?}"),
603 }))
604 .transpose()?
605 .map(|s| s.to_string())
606 );
607
608 let Some(expr) = expr else {
609 debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
611 return Ok(None);
612 };
613
614 let mut add_filter = AddFilterRewriter::new(expr);
615 let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
616
617 let plan =
618 sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;
619 let rewrite = plan
620 .clone()
621 .rewrite(&mut add_filter)
622 .and_then(|p| p.data.rewrite(&mut add_auto_column))
623 .with_context(|_| DatafusionSnafu {
624 context: format!("Failed to rewrite plan:\n {}\n", plan),
625 })?
626 .data;
627 let new_plan = apply_df_optimizer(rewrite).await?;
629
630 Ok(Some((new_plan, schema_len)))
631 }
632}
633
634fn create_table_with_expr(
637 plan: &LogicalPlan,
638 sink_table_name: &[String; 3],
639) -> Result<CreateTableExpr, Error> {
640 let fields = plan.schema().fields();
641 let (first_time_stamp, primary_keys) = build_primary_key_constraint(plan, fields)?;
642
643 let mut column_schemas = Vec::new();
644 for field in fields {
645 let name = field.name();
646 let ty = ConcreteDataType::from_arrow_type(field.data_type());
647 let col_schema = if first_time_stamp == Some(name.clone()) {
648 ColumnSchema::new(name, ty, false).with_time_index(true)
649 } else {
650 ColumnSchema::new(name, ty, true)
651 };
652 column_schemas.push(col_schema);
653 }
654
655 let update_at_schema = ColumnSchema::new(
656 AUTO_CREATED_UPDATE_AT_TS_COL,
657 ConcreteDataType::timestamp_millisecond_datatype(),
658 true,
659 );
660 column_schemas.push(update_at_schema);
661
662 let time_index = if let Some(time_index) = first_time_stamp {
663 time_index
664 } else {
665 column_schemas.push(
666 ColumnSchema::new(
667 AUTO_CREATED_PLACEHOLDER_TS_COL,
668 ConcreteDataType::timestamp_millisecond_datatype(),
669 false,
670 )
671 .with_time_index(true),
672 );
673 AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
674 };
675
676 let column_defs =
677 column_schemas_to_defs(column_schemas, &primary_keys).context(ConvertColumnSchemaSnafu)?;
678 Ok(CreateTableExpr {
679 catalog_name: sink_table_name[0].clone(),
680 schema_name: sink_table_name[1].clone(),
681 table_name: sink_table_name[2].clone(),
682 desc: "Auto created table by flow engine".to_string(),
683 column_defs,
684 time_index,
685 primary_keys,
686 create_if_not_exists: true,
687 table_options: Default::default(),
688 table_id: None,
689 engine: "mito".to_string(),
690 })
691}
692
693fn build_primary_key_constraint(
700 plan: &LogicalPlan,
701 schema: &Fields,
702) -> Result<(Option<String>, Vec<String>), Error> {
703 let mut pk_names = FindGroupByFinalName::default();
704
705 plan.visit(&mut pk_names)
706 .with_context(|_| DatafusionSnafu {
707 context: format!("Can't find aggr expr in plan {plan:?}"),
708 })?;
709
710 let pk_final_names = pk_names.get_group_expr_names().unwrap_or_default();
712 if pk_final_names.is_empty() {
713 return Ok((None, Vec::new()));
714 }
715
716 let all_pk_cols: Vec<_> = schema
717 .iter()
718 .filter(|f| pk_final_names.contains(f.name()))
719 .map(|f| f.name().clone())
720 .collect();
721 let first_time_stamp = schema
723 .iter()
724 .find(|f| {
725 all_pk_cols.contains(&f.name().clone())
726 && ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()
727 })
728 .map(|f| f.name().clone());
729
730 let all_pk_cols: Vec<_> = all_pk_cols
731 .into_iter()
732 .filter(|col| first_time_stamp != Some(col.to_string()))
733 .collect();
734
735 Ok((first_time_stamp, all_pk_cols))
736}
737
738#[cfg(test)]
739mod test {
740 use api::v1::column_def::try_as_column_schema;
741 use pretty_assertions::assert_eq;
742 use session::context::QueryContext;
743
744 use super::*;
745 use crate::test_utils::create_test_query_engine;
746
747 #[tokio::test]
748 async fn test_gen_create_table_sql() {
749 let query_engine = create_test_query_engine();
750 let ctx = QueryContext::arc();
751 struct TestCase {
752 sql: String,
753 sink_table_name: String,
754 column_schemas: Vec<ColumnSchema>,
755 primary_keys: Vec<String>,
756 time_index: String,
757 }
758
759 let update_at_schema = ColumnSchema::new(
760 AUTO_CREATED_UPDATE_AT_TS_COL,
761 ConcreteDataType::timestamp_millisecond_datatype(),
762 true,
763 );
764
765 let ts_placeholder_schema = ColumnSchema::new(
766 AUTO_CREATED_PLACEHOLDER_TS_COL,
767 ConcreteDataType::timestamp_millisecond_datatype(),
768 false,
769 )
770 .with_time_index(true);
771
772 let testcases = vec![
773 TestCase {
774 sql: "SELECT number, ts FROM numbers_with_ts".to_string(),
775 sink_table_name: "new_table".to_string(),
776 column_schemas: vec![
777 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
778 ColumnSchema::new(
779 "ts",
780 ConcreteDataType::timestamp_millisecond_datatype(),
781 true,
782 ),
783 update_at_schema.clone(),
784 ts_placeholder_schema.clone(),
785 ],
786 primary_keys: vec![],
787 time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
788 },
789 TestCase {
790 sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(),
791 sink_table_name: "new_table".to_string(),
792 column_schemas: vec![
793 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
794 ColumnSchema::new(
795 "max(numbers_with_ts.ts)",
796 ConcreteDataType::timestamp_millisecond_datatype(),
797 true,
798 ),
799 update_at_schema.clone(),
800 ts_placeholder_schema.clone(),
801 ],
802 primary_keys: vec!["number".to_string()],
803 time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
804 },
805 TestCase {
806 sql: "SELECT max(number), ts FROM numbers_with_ts GROUP BY ts".to_string(),
807 sink_table_name: "new_table".to_string(),
808 column_schemas: vec![
809 ColumnSchema::new(
810 "max(numbers_with_ts.number)",
811 ConcreteDataType::uint32_datatype(),
812 true,
813 ),
814 ColumnSchema::new(
815 "ts",
816 ConcreteDataType::timestamp_millisecond_datatype(),
817 false,
818 )
819 .with_time_index(true),
820 update_at_schema.clone(),
821 ],
822 primary_keys: vec![],
823 time_index: "ts".to_string(),
824 },
825 TestCase {
826 sql: "SELECT number, ts FROM numbers_with_ts GROUP BY ts, number".to_string(),
827 sink_table_name: "new_table".to_string(),
828 column_schemas: vec![
829 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
830 ColumnSchema::new(
831 "ts",
832 ConcreteDataType::timestamp_millisecond_datatype(),
833 false,
834 )
835 .with_time_index(true),
836 update_at_schema.clone(),
837 ],
838 primary_keys: vec!["number".to_string()],
839 time_index: "ts".to_string(),
840 },
841 ];
842
843 for tc in testcases {
844 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &tc.sql, true)
845 .await
846 .unwrap();
847 let expr = create_table_with_expr(
848 &plan,
849 &[
850 "greptime".to_string(),
851 "public".to_string(),
852 tc.sink_table_name.clone(),
853 ],
854 )
855 .unwrap();
856 let column_schemas = expr
858 .column_defs
859 .iter()
860 .map(|c| try_as_column_schema(c).unwrap())
861 .collect::<Vec<_>>();
862 assert_eq!(tc.column_schemas, column_schemas);
863 assert_eq!(tc.primary_keys, expr.primary_keys);
864 assert_eq!(tc.time_index, expr.time_index);
865 }
866 }
867}