flow/batching_mode/
task.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashSet};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18
19use api::v1::CreateTableExpr;
20use arrow_schema::Fields;
21use catalog::CatalogManagerRef;
22use common_error::ext::BoxedError;
23use common_query::logical_plan::breakup_insert_plan;
24use common_telemetry::tracing::warn;
25use common_telemetry::{debug, info};
26use common_time::Timestamp;
27use datafusion::optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
28use datafusion::optimizer::AnalyzerRule;
29use datafusion::sql::unparser::expr_to_sql;
30use datafusion_common::tree_node::{Transformed, TreeNode};
31use datafusion_common::DFSchemaRef;
32use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
33use datatypes::prelude::ConcreteDataType;
34use datatypes::schema::{ColumnSchema, Schema};
35use operator::expr_helper::column_schemas_to_defs;
36use query::query_engine::DefaultSerializer;
37use query::QueryEngineRef;
38use session::context::QueryContextRef;
39use snafu::{ensure, OptionExt, ResultExt};
40use sql::parser::{ParseOptions, ParserContext};
41use sql::statements::statement::Statement;
42use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
43use tokio::sync::oneshot;
44use tokio::sync::oneshot::error::TryRecvError;
45use tokio::time::Instant;
46
47use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
48use crate::batching_mode::frontend_client::FrontendClient;
49use crate::batching_mode::state::{DirtyTimeWindows, TaskState};
50use crate::batching_mode::time_window::TimeWindowExpr;
51use crate::batching_mode::utils::{
52    get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter,
53    FindGroupByFinalName,
54};
55use crate::batching_mode::{
56    DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, MIN_REFRESH_DURATION, SLOW_QUERY_THRESHOLD,
57};
58use crate::df_optimizer::apply_df_optimizer;
59use crate::error::{
60    ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
61    SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
62};
63use crate::metrics::{
64    METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
65};
66use crate::{Error, FlowId};
67
68/// The task's config, immutable once created
69#[derive(Clone)]
70pub struct TaskConfig {
71    pub flow_id: FlowId,
72    pub query: String,
73    /// output schema of the query
74    pub output_schema: DFSchemaRef,
75    pub time_window_expr: Option<TimeWindowExpr>,
76    /// in seconds
77    pub expire_after: Option<i64>,
78    sink_table_name: [String; 3],
79    pub source_table_names: HashSet<[String; 3]>,
80    catalog_manager: CatalogManagerRef,
81    query_type: QueryType,
82}
83
84fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
85    let stmts =
86        ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
87            .map_err(BoxedError::new)
88            .context(ExternalSnafu)?;
89
90    ensure!(
91        stmts.len() == 1,
92        InvalidQuerySnafu {
93            reason: format!("Expect only one statement, found {}", stmts.len())
94        }
95    );
96    let stmt = &stmts[0];
97    match stmt {
98        Statement::Tql(_) => Ok(QueryType::Tql),
99        _ => Ok(QueryType::Sql),
100    }
101}
102
103#[derive(Debug, Clone)]
104enum QueryType {
105    /// query is a tql query
106    Tql,
107    /// query is a sql query
108    Sql,
109}
110
111#[derive(Clone)]
112pub struct BatchingTask {
113    pub config: Arc<TaskConfig>,
114    pub state: Arc<RwLock<TaskState>>,
115}
116
117impl BatchingTask {
118    #[allow(clippy::too_many_arguments)]
119    pub fn try_new(
120        flow_id: FlowId,
121        query: &str,
122        plan: LogicalPlan,
123        time_window_expr: Option<TimeWindowExpr>,
124        expire_after: Option<i64>,
125        sink_table_name: [String; 3],
126        source_table_names: Vec<[String; 3]>,
127        query_ctx: QueryContextRef,
128        catalog_manager: CatalogManagerRef,
129        shutdown_rx: oneshot::Receiver<()>,
130    ) -> Result<Self, Error> {
131        Ok(Self {
132            config: Arc::new(TaskConfig {
133                flow_id,
134                query: query.to_string(),
135                time_window_expr,
136                expire_after,
137                sink_table_name,
138                source_table_names: source_table_names.into_iter().collect(),
139                catalog_manager,
140                output_schema: plan.schema().clone(),
141                query_type: determine_query_type(query, &query_ctx)?,
142            }),
143            state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
144        })
145    }
146
147    /// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
148    ///
149    /// useful for flush_flow to flush dirty time windows range
150    pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
151        let now = SystemTime::now();
152        let now = Timestamp::new_second(
153            now.duration_since(UNIX_EPOCH)
154                .expect("Time went backwards")
155                .as_secs() as _,
156        );
157        let lower_bound = self
158            .config
159            .expire_after
160            .map(|e| now.sub_duration(Duration::from_secs(e as _)))
161            .transpose()
162            .map_err(BoxedError::new)
163            .context(ExternalSnafu)?
164            .unwrap_or(Timestamp::new_second(0));
165        debug!(
166            "Flow {} mark range ({:?}, {:?}) as dirty",
167            self.config.flow_id, lower_bound, now
168        );
169        self.state
170            .write()
171            .unwrap()
172            .dirty_time_windows
173            .add_window(lower_bound, Some(now));
174        Ok(())
175    }
176
177    /// Create sink table if not exists
178    pub async fn check_or_create_sink_table(
179        &self,
180        engine: &QueryEngineRef,
181        frontend_client: &Arc<FrontendClient>,
182    ) -> Result<Option<(u32, Duration)>, Error> {
183        if !self.is_table_exist(&self.config.sink_table_name).await? {
184            let create_table = self.gen_create_table_expr(engine.clone()).await?;
185            info!(
186                "Try creating sink table(if not exists) with expr: {:?}",
187                create_table
188            );
189            self.create_table(frontend_client, create_table).await?;
190            info!(
191                "Sink table {}(if not exists) created",
192                self.config.sink_table_name.join(".")
193            );
194        }
195
196        Ok(None)
197    }
198
199    async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
200        self.config
201            .catalog_manager
202            .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
203            .await
204            .map_err(BoxedError::new)
205            .context(ExternalSnafu)
206    }
207
208    pub async fn gen_exec_once(
209        &self,
210        engine: &QueryEngineRef,
211        frontend_client: &Arc<FrontendClient>,
212    ) -> Result<Option<(u32, Duration)>, Error> {
213        if let Some(new_query) = self.gen_insert_plan(engine).await? {
214            debug!("Generate new query: {}", new_query);
215            self.execute_logical_plan(frontend_client, &new_query).await
216        } else {
217            debug!("Generate no query");
218            Ok(None)
219        }
220    }
221
222    pub async fn gen_insert_plan(
223        &self,
224        engine: &QueryEngineRef,
225    ) -> Result<Option<LogicalPlan>, Error> {
226        let (table, df_schema) = get_table_info_df_schema(
227            self.config.catalog_manager.clone(),
228            self.config.sink_table_name.clone(),
229        )
230        .await?;
231
232        let new_query = self
233            .gen_query_with_time_window(engine.clone(), &table.meta.schema)
234            .await?;
235
236        let insert_into = if let Some((new_query, _column_cnt)) = new_query {
237            // first check if all columns in input query exists in sink table
238            // since insert into ref to names in record batch generate by given query
239            let table_columns = df_schema
240                .columns()
241                .into_iter()
242                .map(|c| c.name)
243                .collect::<BTreeSet<_>>();
244            for column in new_query.schema().columns() {
245                ensure!(
246                    table_columns.contains(column.name()),
247                    InvalidQuerySnafu {
248                        reason: format!(
249                            "Column {} not found in sink table with columns {:?}",
250                            column, table_columns
251                        ),
252                    }
253                );
254            }
255            // update_at& time index placeholder (if exists) should have default value
256            LogicalPlan::Dml(DmlStatement::new(
257                datafusion_common::TableReference::Full {
258                    catalog: self.config.sink_table_name[0].clone().into(),
259                    schema: self.config.sink_table_name[1].clone().into(),
260                    table: self.config.sink_table_name[2].clone().into(),
261                },
262                df_schema,
263                WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
264                Arc::new(new_query),
265            ))
266        } else {
267            return Ok(None);
268        };
269        let insert_into = insert_into.recompute_schema().context(DatafusionSnafu {
270            context: "Failed to recompute schema",
271        })?;
272        Ok(Some(insert_into))
273    }
274
275    pub async fn create_table(
276        &self,
277        frontend_client: &Arc<FrontendClient>,
278        expr: CreateTableExpr,
279    ) -> Result<(), Error> {
280        let catalog = &self.config.sink_table_name[0];
281        let schema = &self.config.sink_table_name[1];
282        frontend_client
283            .create(expr.clone(), catalog, schema)
284            .await?;
285        Ok(())
286    }
287
288    pub async fn execute_logical_plan(
289        &self,
290        frontend_client: &Arc<FrontendClient>,
291        plan: &LogicalPlan,
292    ) -> Result<Option<(u32, Duration)>, Error> {
293        let instant = Instant::now();
294        let flow_id = self.config.flow_id;
295
296        debug!(
297            "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
298            self.config.expire_after, &plan
299        );
300
301        let catalog = &self.config.sink_table_name[0];
302        let schema = &self.config.sink_table_name[1];
303
304        // fix all table ref by make it fully qualified, i.e. "table_name" => "catalog_name.schema_name.table_name"
305        let fixed_plan = plan
306            .clone()
307            .transform_down_with_subqueries(|p| {
308                if let LogicalPlan::TableScan(mut table_scan) = p {
309                    let resolved = table_scan.table_name.resolve(catalog, schema);
310                    table_scan.table_name = resolved.into();
311                    Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
312                } else {
313                    Ok(Transformed::no(p))
314                }
315            })
316            .with_context(|_| DatafusionSnafu {
317                context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
318            })?
319            .data;
320
321        let expanded_plan = CountWildcardRule::new()
322            .analyze(fixed_plan.clone(), &Default::default())
323            .with_context(|_| DatafusionSnafu {
324                context: format!(
325                    "Failed to expand wildcard in logical plan, plan={:?}",
326                    fixed_plan
327                ),
328            })?;
329
330        let plan = expanded_plan;
331        let mut peer_desc = None;
332
333        let res = {
334            let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
335                .with_label_values(&[flow_id.to_string().as_str()])
336                .start_timer();
337
338            // hack and special handling the insert logical plan
339            let req = if let Some((insert_to, insert_plan)) =
340                breakup_insert_plan(&plan, catalog, schema)
341            {
342                let message = DFLogicalSubstraitConvertor {}
343                    .encode(&insert_plan, DefaultSerializer)
344                    .context(SubstraitEncodeLogicalPlanSnafu)?;
345                api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
346                    query: Some(api::v1::query_request::Query::InsertIntoPlan(
347                        api::v1::InsertIntoPlan {
348                            table_name: Some(insert_to),
349                            logical_plan: message.to_vec(),
350                        },
351                    )),
352                })
353            } else {
354                let message = DFLogicalSubstraitConvertor {}
355                    .encode(&plan, DefaultSerializer)
356                    .context(SubstraitEncodeLogicalPlanSnafu)?;
357
358                api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
359                    query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
360                })
361            };
362
363            frontend_client
364                .handle(req, catalog, schema, &mut peer_desc)
365                .await
366        };
367
368        let elapsed = instant.elapsed();
369        if let Ok(affected_rows) = &res {
370            debug!(
371                "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
372                elapsed
373            );
374        } else if let Err(err) = &res {
375            warn!(
376                "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
377                peer_desc, elapsed, &plan
378            );
379        }
380
381        // record slow query
382        if elapsed >= SLOW_QUERY_THRESHOLD {
383            warn!(
384                "Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}",
385                peer_desc, elapsed, &plan
386            );
387            METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
388                .with_label_values(&[
389                    flow_id.to_string().as_str(),
390                    &peer_desc.unwrap_or_default().to_string(),
391                ])
392                .observe(elapsed.as_secs_f64());
393        }
394
395        self.state
396            .write()
397            .unwrap()
398            .after_query_exec(elapsed, res.is_ok());
399
400        let res = res?;
401
402        Ok(Some((res, elapsed)))
403    }
404
405    /// start executing query in a loop, break when receive shutdown signal
406    ///
407    /// any error will be logged when executing query
408    pub async fn start_executing_loop(
409        &self,
410        engine: QueryEngineRef,
411        frontend_client: Arc<FrontendClient>,
412    ) {
413        loop {
414            // first check if shutdown signal is received
415            // if so, break the loop
416            {
417                let mut state = self.state.write().unwrap();
418                match state.shutdown_rx.try_recv() {
419                    Ok(()) => break,
420                    Err(TryRecvError::Closed) => {
421                        warn!(
422                            "Unexpected shutdown flow {}, shutdown anyway",
423                            self.config.flow_id
424                        );
425                        break;
426                    }
427                    Err(TryRecvError::Empty) => (),
428                }
429            }
430
431            let new_query = match self.gen_insert_plan(&engine).await {
432                Ok(new_query) => new_query,
433                Err(err) => {
434                    common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
435                    // also sleep for a little while before try again to prevent flooding logs
436                    tokio::time::sleep(MIN_REFRESH_DURATION).await;
437                    continue;
438                }
439            };
440
441            let res = if let Some(new_query) = &new_query {
442                self.execute_logical_plan(&frontend_client, new_query).await
443            } else {
444                Ok(None)
445            };
446
447            match res {
448                // normal execute, sleep for some time before doing next query
449                Ok(Some(_)) => {
450                    let sleep_until = {
451                        let state = self.state.write().unwrap();
452
453                        state.get_next_start_query_time(
454                            self.config.flow_id,
455                            &self
456                                .config
457                                .time_window_expr
458                                .as_ref()
459                                .and_then(|t| *t.time_window_size()),
460                            Some(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT),
461                        )
462                    };
463                    tokio::time::sleep_until(sleep_until).await;
464                }
465                // no new data, sleep for some time before checking for new data
466                Ok(None) => {
467                    debug!(
468                        "Flow id = {:?} found no new data, sleep for {:?} then continue",
469                        self.config.flow_id, MIN_REFRESH_DURATION
470                    );
471                    tokio::time::sleep(MIN_REFRESH_DURATION).await;
472                    continue;
473                }
474                // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
475                Err(err) => {
476                    match new_query {
477                        Some(query) => {
478                            common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
479                        }
480                        None => {
481                            common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
482                        }
483                    }
484                    // also sleep for a little while before try again to prevent flooding logs
485                    tokio::time::sleep(MIN_REFRESH_DURATION).await;
486                }
487            }
488        }
489    }
490
491    /// Generate the create table SQL
492    ///
493    /// the auto created table will automatically added a `update_at` Milliseconds DEFAULT now() column in the end
494    /// (for compatibility with flow streaming mode)
495    ///
496    /// and it will use first timestamp column as time index, all other columns will be added as normal columns and nullable
497    async fn gen_create_table_expr(
498        &self,
499        engine: QueryEngineRef,
500    ) -> Result<CreateTableExpr, Error> {
501        let query_ctx = self.state.read().unwrap().query_ctx.clone();
502        let plan =
503            sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
504        create_table_with_expr(&plan, &self.config.sink_table_name)
505    }
506
507    /// will merge and use the first ten time window in query
508    async fn gen_query_with_time_window(
509        &self,
510        engine: QueryEngineRef,
511        sink_table_schema: &Arc<Schema>,
512    ) -> Result<Option<(LogicalPlan, usize)>, Error> {
513        let query_ctx = self.state.read().unwrap().query_ctx.clone();
514        let start = SystemTime::now();
515        let since_the_epoch = start
516            .duration_since(UNIX_EPOCH)
517            .expect("Time went backwards");
518        let low_bound = self
519            .config
520            .expire_after
521            .map(|e| since_the_epoch.as_secs() - e as u64)
522            .unwrap_or(u64::MIN);
523
524        let low_bound = Timestamp::new_second(low_bound as i64);
525        let schema_len = self.config.output_schema.fields().len();
526
527        let expire_time_window_bound = self
528            .config
529            .time_window_expr
530            .as_ref()
531            .map(|expr| expr.eval(low_bound))
532            .transpose()?;
533
534        let (Some((Some(l), Some(u))), QueryType::Sql) =
535            (expire_time_window_bound, &self.config.query_type)
536        else {
537            // either no time window or not a sql query, then just use the original query
538            // use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
539            debug!(
540                "Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id
541            );
542            // clean dirty time window too, this could be from create flow's check_execute
543            self.state.write().unwrap().dirty_time_windows.clean();
544
545            // TODO(discord9): not add auto column for tql query?
546            let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
547
548            let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false)
549                .await?;
550
551            let plan = plan
552                .clone()
553                .rewrite(&mut add_auto_column)
554                .with_context(|_| DatafusionSnafu {
555                    context: format!("Failed to rewrite plan:\n {}\n", plan),
556                })?
557                .data;
558            let schema_len = plan.schema().fields().len();
559
560            // since no time window lower/upper bound is found, just return the original query(with auto columns)
561            return Ok(Some((plan, schema_len)));
562        };
563
564        debug!(
565            "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?}",
566            self.config.flow_id, l, u
567        );
568        let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
569            reason: format!("Can't get window size from {u:?} - {l:?}"),
570        })?;
571        let col_name = self
572            .config
573            .time_window_expr
574            .as_ref()
575            .map(|expr| expr.column_name.clone())
576            .with_context(|| UnexpectedSnafu {
577                reason: format!(
578                    "Flow id={:?}, Failed to get column name from time window expr",
579                    self.config.flow_id
580                ),
581            })?;
582
583        let expr = self
584            .state
585            .write()
586            .unwrap()
587            .dirty_time_windows
588            .gen_filter_exprs(
589                &col_name,
590                Some(l),
591                window_size,
592                DirtyTimeWindows::MAX_FILTER_NUM,
593                self.config.flow_id,
594                Some(self),
595            )?;
596
597        debug!(
598            "Flow id={:?}, Generated filter expr: {:?}",
599            self.config.flow_id,
600            expr.as_ref()
601                .map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu {
602                    context: format!("Failed to generate filter expr from {expr:?}"),
603                }))
604                .transpose()?
605                .map(|s| s.to_string())
606        );
607
608        let Some(expr) = expr else {
609            // no new data, hence no need to update
610            debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
611            return Ok(None);
612        };
613
614        let mut add_filter = AddFilterRewriter::new(expr);
615        let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
616
617        let plan =
618            sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;
619        let rewrite = plan
620            .clone()
621            .rewrite(&mut add_filter)
622            .and_then(|p| p.data.rewrite(&mut add_auto_column))
623            .with_context(|_| DatafusionSnafu {
624                context: format!("Failed to rewrite plan:\n {}\n", plan),
625            })?
626            .data;
627        // only apply optimize after complex rewrite is done
628        let new_plan = apply_df_optimizer(rewrite).await?;
629
630        Ok(Some((new_plan, schema_len)))
631    }
632}
633
634// auto created table have a auto added column `update_at`, and optional have a `AUTO_CREATED_PLACEHOLDER_TS_COL` column for time index placeholder if no timestamp column is specified
635// TODO(discord9): for now no default value is set for auto added column for compatibility reason with streaming mode, but this might change in favor of simpler code?
636fn create_table_with_expr(
637    plan: &LogicalPlan,
638    sink_table_name: &[String; 3],
639) -> Result<CreateTableExpr, Error> {
640    let fields = plan.schema().fields();
641    let (first_time_stamp, primary_keys) = build_primary_key_constraint(plan, fields)?;
642
643    let mut column_schemas = Vec::new();
644    for field in fields {
645        let name = field.name();
646        let ty = ConcreteDataType::from_arrow_type(field.data_type());
647        let col_schema = if first_time_stamp == Some(name.clone()) {
648            ColumnSchema::new(name, ty, false).with_time_index(true)
649        } else {
650            ColumnSchema::new(name, ty, true)
651        };
652        column_schemas.push(col_schema);
653    }
654
655    let update_at_schema = ColumnSchema::new(
656        AUTO_CREATED_UPDATE_AT_TS_COL,
657        ConcreteDataType::timestamp_millisecond_datatype(),
658        true,
659    );
660    column_schemas.push(update_at_schema);
661
662    let time_index = if let Some(time_index) = first_time_stamp {
663        time_index
664    } else {
665        column_schemas.push(
666            ColumnSchema::new(
667                AUTO_CREATED_PLACEHOLDER_TS_COL,
668                ConcreteDataType::timestamp_millisecond_datatype(),
669                false,
670            )
671            .with_time_index(true),
672        );
673        AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
674    };
675
676    let column_defs =
677        column_schemas_to_defs(column_schemas, &primary_keys).context(ConvertColumnSchemaSnafu)?;
678    Ok(CreateTableExpr {
679        catalog_name: sink_table_name[0].clone(),
680        schema_name: sink_table_name[1].clone(),
681        table_name: sink_table_name[2].clone(),
682        desc: "Auto created table by flow engine".to_string(),
683        column_defs,
684        time_index,
685        primary_keys,
686        create_if_not_exists: true,
687        table_options: Default::default(),
688        table_id: None,
689        engine: "mito".to_string(),
690    })
691}
692
693/// Return first timestamp column which is in group by clause and other columns which are also in group by clause
694///
695/// # Returns
696///
697/// * `Option<String>` - first timestamp column which is in group by clause
698/// * `Vec<String>` - other columns which are also in group by clause
699fn build_primary_key_constraint(
700    plan: &LogicalPlan,
701    schema: &Fields,
702) -> Result<(Option<String>, Vec<String>), Error> {
703    let mut pk_names = FindGroupByFinalName::default();
704
705    plan.visit(&mut pk_names)
706        .with_context(|_| DatafusionSnafu {
707            context: format!("Can't find aggr expr in plan {plan:?}"),
708        })?;
709
710    // if no group by clause, return empty
711    let pk_final_names = pk_names.get_group_expr_names().unwrap_or_default();
712    if pk_final_names.is_empty() {
713        return Ok((None, Vec::new()));
714    }
715
716    let all_pk_cols: Vec<_> = schema
717        .iter()
718        .filter(|f| pk_final_names.contains(f.name()))
719        .map(|f| f.name().clone())
720        .collect();
721    // auto create table use first timestamp column in group by clause as time index
722    let first_time_stamp = schema
723        .iter()
724        .find(|f| {
725            all_pk_cols.contains(&f.name().clone())
726                && ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()
727        })
728        .map(|f| f.name().clone());
729
730    let all_pk_cols: Vec<_> = all_pk_cols
731        .into_iter()
732        .filter(|col| first_time_stamp != Some(col.to_string()))
733        .collect();
734
735    Ok((first_time_stamp, all_pk_cols))
736}
737
738#[cfg(test)]
739mod test {
740    use api::v1::column_def::try_as_column_schema;
741    use pretty_assertions::assert_eq;
742    use session::context::QueryContext;
743
744    use super::*;
745    use crate::test_utils::create_test_query_engine;
746
747    #[tokio::test]
748    async fn test_gen_create_table_sql() {
749        let query_engine = create_test_query_engine();
750        let ctx = QueryContext::arc();
751        struct TestCase {
752            sql: String,
753            sink_table_name: String,
754            column_schemas: Vec<ColumnSchema>,
755            primary_keys: Vec<String>,
756            time_index: String,
757        }
758
759        let update_at_schema = ColumnSchema::new(
760            AUTO_CREATED_UPDATE_AT_TS_COL,
761            ConcreteDataType::timestamp_millisecond_datatype(),
762            true,
763        );
764
765        let ts_placeholder_schema = ColumnSchema::new(
766            AUTO_CREATED_PLACEHOLDER_TS_COL,
767            ConcreteDataType::timestamp_millisecond_datatype(),
768            false,
769        )
770        .with_time_index(true);
771
772        let testcases = vec![
773            TestCase {
774                sql: "SELECT number, ts FROM numbers_with_ts".to_string(),
775                sink_table_name: "new_table".to_string(),
776                column_schemas: vec![
777                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
778                    ColumnSchema::new(
779                        "ts",
780                        ConcreteDataType::timestamp_millisecond_datatype(),
781                        true,
782                    ),
783                    update_at_schema.clone(),
784                    ts_placeholder_schema.clone(),
785                ],
786                primary_keys: vec![],
787                time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
788            },
789            TestCase {
790                sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(),
791                sink_table_name: "new_table".to_string(),
792                column_schemas: vec![
793                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
794                    ColumnSchema::new(
795                        "max(numbers_with_ts.ts)",
796                        ConcreteDataType::timestamp_millisecond_datatype(),
797                        true,
798                    ),
799                    update_at_schema.clone(),
800                    ts_placeholder_schema.clone(),
801                ],
802                primary_keys: vec!["number".to_string()],
803                time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
804            },
805            TestCase {
806                sql: "SELECT max(number), ts FROM numbers_with_ts GROUP BY ts".to_string(),
807                sink_table_name: "new_table".to_string(),
808                column_schemas: vec![
809                    ColumnSchema::new(
810                        "max(numbers_with_ts.number)",
811                        ConcreteDataType::uint32_datatype(),
812                        true,
813                    ),
814                    ColumnSchema::new(
815                        "ts",
816                        ConcreteDataType::timestamp_millisecond_datatype(),
817                        false,
818                    )
819                    .with_time_index(true),
820                    update_at_schema.clone(),
821                ],
822                primary_keys: vec![],
823                time_index: "ts".to_string(),
824            },
825            TestCase {
826                sql: "SELECT number, ts FROM numbers_with_ts GROUP BY ts, number".to_string(),
827                sink_table_name: "new_table".to_string(),
828                column_schemas: vec![
829                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
830                    ColumnSchema::new(
831                        "ts",
832                        ConcreteDataType::timestamp_millisecond_datatype(),
833                        false,
834                    )
835                    .with_time_index(true),
836                    update_at_schema.clone(),
837                ],
838                primary_keys: vec!["number".to_string()],
839                time_index: "ts".to_string(),
840            },
841        ];
842
843        for tc in testcases {
844            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &tc.sql, true)
845                .await
846                .unwrap();
847            let expr = create_table_with_expr(
848                &plan,
849                &[
850                    "greptime".to_string(),
851                    "public".to_string(),
852                    tc.sink_table_name.clone(),
853                ],
854            )
855            .unwrap();
856            // TODO(discord9): assert expr
857            let column_schemas = expr
858                .column_defs
859                .iter()
860                .map(|c| try_as_column_schema(c).unwrap())
861                .collect::<Vec<_>>();
862            assert_eq!(tc.column_schemas, column_schemas);
863            assert_eq!(tc.primary_keys, expr.primary_keys);
864            assert_eq!(tc.time_index, expr.time_index);
865        }
866    }
867}