1use std::collections::{BTreeSet, HashSet};
16use std::ops::Deref;
17use std::sync::{Arc, RwLock};
18use std::time::{Duration, SystemTime, UNIX_EPOCH};
19
20use api::v1::CreateTableExpr;
21use arrow_schema::Fields;
22use catalog::CatalogManagerRef;
23use common_error::ext::BoxedError;
24use common_query::logical_plan::breakup_insert_plan;
25use common_telemetry::tracing::warn;
26use common_telemetry::{debug, info};
27use common_time::Timestamp;
28use datafusion::optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
29use datafusion::optimizer::AnalyzerRule;
30use datafusion::sql::unparser::expr_to_sql;
31use datafusion_common::tree_node::{Transformed, TreeNode};
32use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
33use datatypes::prelude::ConcreteDataType;
34use datatypes::schema::{ColumnSchema, Schema};
35use operator::expr_helper::column_schemas_to_defs;
36use query::query_engine::DefaultSerializer;
37use query::QueryEngineRef;
38use session::context::QueryContextRef;
39use snafu::{ensure, OptionExt, ResultExt};
40use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
41use tokio::sync::oneshot;
42use tokio::sync::oneshot::error::TryRecvError;
43use tokio::time::Instant;
44
45use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
46use crate::batching_mode::frontend_client::FrontendClient;
47use crate::batching_mode::state::TaskState;
48use crate::batching_mode::time_window::TimeWindowExpr;
49use crate::batching_mode::utils::{
50 get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter,
51 FindGroupByFinalName,
52};
53use crate::batching_mode::{
54 DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, MIN_REFRESH_DURATION, SLOW_QUERY_THRESHOLD,
55};
56use crate::df_optimizer::apply_df_optimizer;
57use crate::error::{
58 ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
59 SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
60};
61use crate::metrics::{
62 METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
63};
64use crate::{Error, FlowId};
65
66#[derive(Clone)]
68pub struct TaskConfig {
69 pub flow_id: FlowId,
70 pub query: String,
71 plan: Arc<LogicalPlan>,
72 pub time_window_expr: Option<TimeWindowExpr>,
73 pub expire_after: Option<i64>,
75 sink_table_name: [String; 3],
76 pub source_table_names: HashSet<[String; 3]>,
77 catalog_manager: CatalogManagerRef,
78}
79
80#[derive(Clone)]
81pub struct BatchingTask {
82 pub config: Arc<TaskConfig>,
83 pub state: Arc<RwLock<TaskState>>,
84}
85
86impl BatchingTask {
87 #[allow(clippy::too_many_arguments)]
88 pub fn new(
89 flow_id: FlowId,
90 query: &str,
91 plan: LogicalPlan,
92 time_window_expr: Option<TimeWindowExpr>,
93 expire_after: Option<i64>,
94 sink_table_name: [String; 3],
95 source_table_names: Vec<[String; 3]>,
96 query_ctx: QueryContextRef,
97 catalog_manager: CatalogManagerRef,
98 shutdown_rx: oneshot::Receiver<()>,
99 ) -> Self {
100 Self {
101 config: Arc::new(TaskConfig {
102 flow_id,
103 query: query.to_string(),
104 plan: Arc::new(plan),
105 time_window_expr,
106 expire_after,
107 sink_table_name,
108 source_table_names: source_table_names.into_iter().collect(),
109 catalog_manager,
110 }),
111 state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
112 }
113 }
114
115 pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
119 let now = SystemTime::now();
120 let now = Timestamp::new_second(
121 now.duration_since(UNIX_EPOCH)
122 .expect("Time went backwards")
123 .as_secs() as _,
124 );
125 let lower_bound = self
126 .config
127 .expire_after
128 .map(|e| now.sub_duration(Duration::from_secs(e as _)))
129 .transpose()
130 .map_err(BoxedError::new)
131 .context(ExternalSnafu)?
132 .unwrap_or(Timestamp::new_second(0));
133 debug!(
134 "Flow {} mark range ({:?}, {:?}) as dirty",
135 self.config.flow_id, lower_bound, now
136 );
137 self.state
138 .write()
139 .unwrap()
140 .dirty_time_windows
141 .add_window(lower_bound, Some(now));
142 Ok(())
143 }
144
145 pub async fn check_or_create_sink_table(
147 &self,
148 engine: &QueryEngineRef,
149 frontend_client: &Arc<FrontendClient>,
150 ) -> Result<Option<(u32, Duration)>, Error> {
151 if !self.is_table_exist(&self.config.sink_table_name).await? {
152 let create_table = self.gen_create_table_expr(engine.clone()).await?;
153 info!(
154 "Try creating sink table(if not exists) with expr: {:?}",
155 create_table
156 );
157 self.create_table(frontend_client, create_table).await?;
158 info!(
159 "Sink table {}(if not exists) created",
160 self.config.sink_table_name.join(".")
161 );
162 }
163
164 Ok(None)
165 }
166
167 async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
168 self.config
169 .catalog_manager
170 .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
171 .await
172 .map_err(BoxedError::new)
173 .context(ExternalSnafu)
174 }
175
176 pub async fn gen_exec_once(
177 &self,
178 engine: &QueryEngineRef,
179 frontend_client: &Arc<FrontendClient>,
180 ) -> Result<Option<(u32, Duration)>, Error> {
181 if let Some(new_query) = self.gen_insert_plan(engine).await? {
182 debug!("Generate new query: {}", new_query);
183 self.execute_logical_plan(frontend_client, &new_query).await
184 } else {
185 debug!("Generate no query");
186 Ok(None)
187 }
188 }
189
190 pub async fn gen_insert_plan(
191 &self,
192 engine: &QueryEngineRef,
193 ) -> Result<Option<LogicalPlan>, Error> {
194 let (table, df_schema) = get_table_info_df_schema(
195 self.config.catalog_manager.clone(),
196 self.config.sink_table_name.clone(),
197 )
198 .await?;
199
200 let new_query = self
201 .gen_query_with_time_window(engine.clone(), &table.meta.schema)
202 .await?;
203
204 let insert_into = if let Some((new_query, _column_cnt)) = new_query {
205 let table_columns = df_schema
208 .columns()
209 .into_iter()
210 .map(|c| c.name)
211 .collect::<BTreeSet<_>>();
212 for column in new_query.schema().columns() {
213 ensure!(
214 table_columns.contains(column.name()),
215 InvalidQuerySnafu {
216 reason: format!(
217 "Column {} not found in sink table with columns {:?}",
218 column, table_columns
219 ),
220 }
221 );
222 }
223 LogicalPlan::Dml(DmlStatement::new(
225 datafusion_common::TableReference::Full {
226 catalog: self.config.sink_table_name[0].clone().into(),
227 schema: self.config.sink_table_name[1].clone().into(),
228 table: self.config.sink_table_name[2].clone().into(),
229 },
230 df_schema,
231 WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
232 Arc::new(new_query),
233 ))
234 } else {
235 return Ok(None);
236 };
237 let insert_into = insert_into.recompute_schema().context(DatafusionSnafu {
238 context: "Failed to recompute schema",
239 })?;
240 Ok(Some(insert_into))
241 }
242
243 pub async fn create_table(
244 &self,
245 frontend_client: &Arc<FrontendClient>,
246 expr: CreateTableExpr,
247 ) -> Result<(), Error> {
248 let catalog = &self.config.sink_table_name[0];
249 let schema = &self.config.sink_table_name[1];
250 frontend_client
251 .create(expr.clone(), catalog, schema)
252 .await?;
253 Ok(())
254 }
255
256 pub async fn execute_logical_plan(
257 &self,
258 frontend_client: &Arc<FrontendClient>,
259 plan: &LogicalPlan,
260 ) -> Result<Option<(u32, Duration)>, Error> {
261 let instant = Instant::now();
262 let flow_id = self.config.flow_id;
263
264 debug!(
265 "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
266 self.config.expire_after, &plan
267 );
268
269 let catalog = &self.config.sink_table_name[0];
270 let schema = &self.config.sink_table_name[1];
271
272 let fixed_plan = plan
274 .clone()
275 .transform_down_with_subqueries(|p| {
276 if let LogicalPlan::TableScan(mut table_scan) = p {
277 let resolved = table_scan.table_name.resolve(catalog, schema);
278 table_scan.table_name = resolved.into();
279 Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
280 } else {
281 Ok(Transformed::no(p))
282 }
283 })
284 .with_context(|_| DatafusionSnafu {
285 context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
286 })?
287 .data;
288
289 let expanded_plan = CountWildcardRule::new()
290 .analyze(fixed_plan.clone(), &Default::default())
291 .with_context(|_| DatafusionSnafu {
292 context: format!(
293 "Failed to expand wildcard in logical plan, plan={:?}",
294 fixed_plan
295 ),
296 })?;
297
298 let plan = expanded_plan;
299 let mut peer_desc = None;
300
301 let res = {
302 let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
303 .with_label_values(&[flow_id.to_string().as_str()])
304 .start_timer();
305
306 let req = if let Some((insert_to, insert_plan)) =
308 breakup_insert_plan(&plan, catalog, schema)
309 {
310 let message = DFLogicalSubstraitConvertor {}
311 .encode(&insert_plan, DefaultSerializer)
312 .context(SubstraitEncodeLogicalPlanSnafu)?;
313 api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
314 query: Some(api::v1::query_request::Query::InsertIntoPlan(
315 api::v1::InsertIntoPlan {
316 table_name: Some(insert_to),
317 logical_plan: message.to_vec(),
318 },
319 )),
320 })
321 } else {
322 let message = DFLogicalSubstraitConvertor {}
323 .encode(&plan, DefaultSerializer)
324 .context(SubstraitEncodeLogicalPlanSnafu)?;
325
326 api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
327 query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
328 })
329 };
330
331 frontend_client
332 .handle(req, catalog, schema, &mut peer_desc)
333 .await
334 };
335
336 let elapsed = instant.elapsed();
337 if let Ok(affected_rows) = &res {
338 debug!(
339 "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
340 elapsed
341 );
342 } else if let Err(err) = &res {
343 warn!(
344 "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
345 peer_desc, elapsed, &plan
346 );
347 }
348
349 if elapsed >= SLOW_QUERY_THRESHOLD {
351 warn!(
352 "Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}",
353 peer_desc, elapsed, &plan
354 );
355 METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
356 .with_label_values(&[
357 flow_id.to_string().as_str(),
358 &plan.to_string(),
359 &peer_desc.unwrap_or_default().to_string(),
360 ])
361 .observe(elapsed.as_secs_f64());
362 }
363
364 self.state
365 .write()
366 .unwrap()
367 .after_query_exec(elapsed, res.is_ok());
368
369 let res = res?;
370
371 Ok(Some((res, elapsed)))
372 }
373
374 pub async fn start_executing_loop(
378 &self,
379 engine: QueryEngineRef,
380 frontend_client: Arc<FrontendClient>,
381 ) {
382 loop {
383 let mut new_query = None;
384 let mut gen_and_exec = async || {
385 new_query = self.gen_insert_plan(&engine).await?;
386 if let Some(new_query) = &new_query {
387 self.execute_logical_plan(&frontend_client, new_query).await
388 } else {
389 Ok(None)
390 }
391 };
392 match gen_and_exec().await {
393 Ok(Some(_)) => {
395 let sleep_until = {
396 let mut state = self.state.write().unwrap();
397 match state.shutdown_rx.try_recv() {
398 Ok(()) => break,
399 Err(TryRecvError::Closed) => {
400 warn!(
401 "Unexpected shutdown flow {}, shutdown anyway",
402 self.config.flow_id
403 );
404 break;
405 }
406 Err(TryRecvError::Empty) => (),
407 }
408 state.get_next_start_query_time(
409 self.config.flow_id,
410 Some(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT),
411 )
412 };
413 tokio::time::sleep_until(sleep_until).await;
414 }
415 Ok(None) => {
417 debug!(
418 "Flow id = {:?} found no new data, sleep for {:?} then continue",
419 self.config.flow_id, MIN_REFRESH_DURATION
420 );
421 tokio::time::sleep(MIN_REFRESH_DURATION).await;
422 continue;
423 }
424 Err(err) => {
426 match new_query {
427 Some(query) => {
428 common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
429 }
430 None => {
431 common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
432 }
433 }
434 tokio::time::sleep(MIN_REFRESH_DURATION).await;
436 }
437 }
438 }
439 }
440
441 async fn gen_create_table_expr(
448 &self,
449 engine: QueryEngineRef,
450 ) -> Result<CreateTableExpr, Error> {
451 let query_ctx = self.state.read().unwrap().query_ctx.clone();
452 let plan =
453 sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
454 create_table_with_expr(&plan, &self.config.sink_table_name)
455 }
456
457 async fn gen_query_with_time_window(
459 &self,
460 engine: QueryEngineRef,
461 sink_table_schema: &Arc<Schema>,
462 ) -> Result<Option<(LogicalPlan, usize)>, Error> {
463 let query_ctx = self.state.read().unwrap().query_ctx.clone();
464 let start = SystemTime::now();
465 let since_the_epoch = start
466 .duration_since(UNIX_EPOCH)
467 .expect("Time went backwards");
468 let low_bound = self
469 .config
470 .expire_after
471 .map(|e| since_the_epoch.as_secs() - e as u64)
472 .unwrap_or(u64::MIN);
473
474 let low_bound = Timestamp::new_second(low_bound as i64);
475 let schema_len = self.config.plan.schema().fields().len();
476
477 let expire_time_window_bound = self
478 .config
479 .time_window_expr
480 .as_ref()
481 .map(|expr| expr.eval(low_bound))
482 .transpose()?;
483
484 let new_plan = {
485 let expr = {
486 match expire_time_window_bound {
487 Some((Some(l), Some(u))) => {
488 let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
489 reason: format!("Can't get window size from {u:?} - {l:?}"),
490 })?;
491 let col_name = self
492 .config
493 .time_window_expr
494 .as_ref()
495 .map(|expr| expr.column_name.clone())
496 .with_context(|| UnexpectedSnafu {
497 reason: format!(
498 "Flow id={:?}, Failed to get column name from time window expr",
499 self.config.flow_id
500 ),
501 })?;
502
503 self.state
504 .write()
505 .unwrap()
506 .dirty_time_windows
507 .gen_filter_exprs(
508 &col_name,
509 Some(l),
510 window_size,
511 self.config.flow_id,
512 Some(self),
513 )?
514 }
515 _ => {
516 debug!(
518 "Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id
519 );
520 self.state.write().unwrap().dirty_time_windows.clean();
522
523 let mut add_auto_column =
524 AddAutoColumnRewriter::new(sink_table_schema.clone());
525 let plan = self
526 .config
527 .plan
528 .deref()
529 .clone()
530 .rewrite(&mut add_auto_column)
531 .with_context(|_| DatafusionSnafu {
532 context: format!(
533 "Failed to rewrite plan:\n {}\n",
534 self.config.plan
535 ),
536 })?
537 .data;
538 let schema_len = plan.schema().fields().len();
539
540 return Ok(Some((plan, schema_len)));
542 }
543 }
544 };
545
546 debug!(
547 "Flow id={:?}, Generated filter expr: {:?}",
548 self.config.flow_id,
549 expr.as_ref()
550 .map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu {
551 context: format!("Failed to generate filter expr from {expr:?}"),
552 }))
553 .transpose()?
554 .map(|s| s.to_string())
555 );
556
557 let Some(expr) = expr else {
558 debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
560 return Ok(None);
561 };
562
563 let mut add_filter = AddFilterRewriter::new(expr);
566 let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
567
568 let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false)
569 .await?;
570 let rewrite = plan
571 .clone()
572 .rewrite(&mut add_filter)
573 .and_then(|p| p.data.rewrite(&mut add_auto_column))
574 .with_context(|_| DatafusionSnafu {
575 context: format!("Failed to rewrite plan:\n {}\n", plan),
576 })?
577 .data;
578 apply_df_optimizer(rewrite).await?
580 };
581
582 Ok(Some((new_plan, schema_len)))
583 }
584}
585
586fn create_table_with_expr(
589 plan: &LogicalPlan,
590 sink_table_name: &[String; 3],
591) -> Result<CreateTableExpr, Error> {
592 let fields = plan.schema().fields();
593 let (first_time_stamp, primary_keys) = build_primary_key_constraint(plan, fields)?;
594
595 let mut column_schemas = Vec::new();
596 for field in fields {
597 let name = field.name();
598 let ty = ConcreteDataType::from_arrow_type(field.data_type());
599 let col_schema = if first_time_stamp == Some(name.clone()) {
600 ColumnSchema::new(name, ty, false).with_time_index(true)
601 } else {
602 ColumnSchema::new(name, ty, true)
603 };
604 column_schemas.push(col_schema);
605 }
606
607 let update_at_schema = ColumnSchema::new(
608 AUTO_CREATED_UPDATE_AT_TS_COL,
609 ConcreteDataType::timestamp_millisecond_datatype(),
610 true,
611 );
612 column_schemas.push(update_at_schema);
613
614 let time_index = if let Some(time_index) = first_time_stamp {
615 time_index
616 } else {
617 column_schemas.push(
618 ColumnSchema::new(
619 AUTO_CREATED_PLACEHOLDER_TS_COL,
620 ConcreteDataType::timestamp_millisecond_datatype(),
621 false,
622 )
623 .with_time_index(true),
624 );
625 AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
626 };
627
628 let column_defs =
629 column_schemas_to_defs(column_schemas, &primary_keys).context(ConvertColumnSchemaSnafu)?;
630 Ok(CreateTableExpr {
631 catalog_name: sink_table_name[0].clone(),
632 schema_name: sink_table_name[1].clone(),
633 table_name: sink_table_name[2].clone(),
634 desc: "Auto created table by flow engine".to_string(),
635 column_defs,
636 time_index,
637 primary_keys,
638 create_if_not_exists: true,
639 table_options: Default::default(),
640 table_id: None,
641 engine: "mito".to_string(),
642 })
643}
644
645fn build_primary_key_constraint(
652 plan: &LogicalPlan,
653 schema: &Fields,
654) -> Result<(Option<String>, Vec<String>), Error> {
655 let mut pk_names = FindGroupByFinalName::default();
656
657 plan.visit(&mut pk_names)
658 .with_context(|_| DatafusionSnafu {
659 context: format!("Can't find aggr expr in plan {plan:?}"),
660 })?;
661
662 let pk_final_names = pk_names.get_group_expr_names().unwrap_or_default();
664 if pk_final_names.is_empty() {
665 return Ok((None, Vec::new()));
666 }
667
668 let all_pk_cols: Vec<_> = schema
669 .iter()
670 .filter(|f| pk_final_names.contains(f.name()))
671 .map(|f| f.name().clone())
672 .collect();
673 let first_time_stamp = schema
675 .iter()
676 .find(|f| {
677 all_pk_cols.contains(&f.name().clone())
678 && ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()
679 })
680 .map(|f| f.name().clone());
681
682 let all_pk_cols: Vec<_> = all_pk_cols
683 .into_iter()
684 .filter(|col| first_time_stamp != Some(col.to_string()))
685 .collect();
686
687 Ok((first_time_stamp, all_pk_cols))
688}
689
690#[cfg(test)]
691mod test {
692 use api::v1::column_def::try_as_column_schema;
693 use pretty_assertions::assert_eq;
694 use session::context::QueryContext;
695
696 use super::*;
697 use crate::test_utils::create_test_query_engine;
698
699 #[tokio::test]
700 async fn test_gen_create_table_sql() {
701 let query_engine = create_test_query_engine();
702 let ctx = QueryContext::arc();
703 struct TestCase {
704 sql: String,
705 sink_table_name: String,
706 column_schemas: Vec<ColumnSchema>,
707 primary_keys: Vec<String>,
708 time_index: String,
709 }
710
711 let update_at_schema = ColumnSchema::new(
712 AUTO_CREATED_UPDATE_AT_TS_COL,
713 ConcreteDataType::timestamp_millisecond_datatype(),
714 true,
715 );
716
717 let ts_placeholder_schema = ColumnSchema::new(
718 AUTO_CREATED_PLACEHOLDER_TS_COL,
719 ConcreteDataType::timestamp_millisecond_datatype(),
720 false,
721 )
722 .with_time_index(true);
723
724 let testcases = vec![
725 TestCase {
726 sql: "SELECT number, ts FROM numbers_with_ts".to_string(),
727 sink_table_name: "new_table".to_string(),
728 column_schemas: vec![
729 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
730 ColumnSchema::new(
731 "ts",
732 ConcreteDataType::timestamp_millisecond_datatype(),
733 true,
734 ),
735 update_at_schema.clone(),
736 ts_placeholder_schema.clone(),
737 ],
738 primary_keys: vec![],
739 time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
740 },
741 TestCase {
742 sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(),
743 sink_table_name: "new_table".to_string(),
744 column_schemas: vec![
745 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
746 ColumnSchema::new(
747 "max(numbers_with_ts.ts)",
748 ConcreteDataType::timestamp_millisecond_datatype(),
749 true,
750 ),
751 update_at_schema.clone(),
752 ts_placeholder_schema.clone(),
753 ],
754 primary_keys: vec!["number".to_string()],
755 time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
756 },
757 TestCase {
758 sql: "SELECT max(number), ts FROM numbers_with_ts GROUP BY ts".to_string(),
759 sink_table_name: "new_table".to_string(),
760 column_schemas: vec![
761 ColumnSchema::new(
762 "max(numbers_with_ts.number)",
763 ConcreteDataType::uint32_datatype(),
764 true,
765 ),
766 ColumnSchema::new(
767 "ts",
768 ConcreteDataType::timestamp_millisecond_datatype(),
769 false,
770 )
771 .with_time_index(true),
772 update_at_schema.clone(),
773 ],
774 primary_keys: vec![],
775 time_index: "ts".to_string(),
776 },
777 TestCase {
778 sql: "SELECT number, ts FROM numbers_with_ts GROUP BY ts, number".to_string(),
779 sink_table_name: "new_table".to_string(),
780 column_schemas: vec![
781 ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
782 ColumnSchema::new(
783 "ts",
784 ConcreteDataType::timestamp_millisecond_datatype(),
785 false,
786 )
787 .with_time_index(true),
788 update_at_schema.clone(),
789 ],
790 primary_keys: vec!["number".to_string()],
791 time_index: "ts".to_string(),
792 },
793 ];
794
795 for tc in testcases {
796 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &tc.sql, true)
797 .await
798 .unwrap();
799 let expr = create_table_with_expr(
800 &plan,
801 &[
802 "greptime".to_string(),
803 "public".to_string(),
804 tc.sink_table_name.clone(),
805 ],
806 )
807 .unwrap();
808 let column_schemas = expr
810 .column_defs
811 .iter()
812 .map(|c| try_as_column_schema(c).unwrap())
813 .collect::<Vec<_>>();
814 assert_eq!(tc.column_schemas, column_schemas);
815 assert_eq!(tc.primary_keys, expr.primary_keys);
816 assert_eq!(tc.time_index, expr.time_index);
817 }
818 }
819}