flow/batching_mode/
task.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashSet};
16use std::ops::Deref;
17use std::sync::{Arc, RwLock};
18use std::time::{Duration, SystemTime, UNIX_EPOCH};
19
20use api::v1::CreateTableExpr;
21use arrow_schema::Fields;
22use catalog::CatalogManagerRef;
23use common_error::ext::BoxedError;
24use common_query::logical_plan::breakup_insert_plan;
25use common_telemetry::tracing::warn;
26use common_telemetry::{debug, info};
27use common_time::Timestamp;
28use datafusion::optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
29use datafusion::optimizer::AnalyzerRule;
30use datafusion::sql::unparser::expr_to_sql;
31use datafusion_common::tree_node::{Transformed, TreeNode};
32use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
33use datatypes::prelude::ConcreteDataType;
34use datatypes::schema::{ColumnSchema, Schema};
35use operator::expr_helper::column_schemas_to_defs;
36use query::query_engine::DefaultSerializer;
37use query::QueryEngineRef;
38use session::context::QueryContextRef;
39use snafu::{ensure, OptionExt, ResultExt};
40use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
41use tokio::sync::oneshot;
42use tokio::sync::oneshot::error::TryRecvError;
43use tokio::time::Instant;
44
45use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
46use crate::batching_mode::frontend_client::FrontendClient;
47use crate::batching_mode::state::TaskState;
48use crate::batching_mode::time_window::TimeWindowExpr;
49use crate::batching_mode::utils::{
50    get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter,
51    FindGroupByFinalName,
52};
53use crate::batching_mode::{
54    DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, MIN_REFRESH_DURATION, SLOW_QUERY_THRESHOLD,
55};
56use crate::df_optimizer::apply_df_optimizer;
57use crate::error::{
58    ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
59    SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
60};
61use crate::metrics::{
62    METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
63};
64use crate::{Error, FlowId};
65
66/// The task's config, immutable once created
67#[derive(Clone)]
68pub struct TaskConfig {
69    pub flow_id: FlowId,
70    pub query: String,
71    plan: Arc<LogicalPlan>,
72    pub time_window_expr: Option<TimeWindowExpr>,
73    /// in seconds
74    pub expire_after: Option<i64>,
75    sink_table_name: [String; 3],
76    pub source_table_names: HashSet<[String; 3]>,
77    catalog_manager: CatalogManagerRef,
78}
79
80#[derive(Clone)]
81pub struct BatchingTask {
82    pub config: Arc<TaskConfig>,
83    pub state: Arc<RwLock<TaskState>>,
84}
85
86impl BatchingTask {
87    #[allow(clippy::too_many_arguments)]
88    pub fn new(
89        flow_id: FlowId,
90        query: &str,
91        plan: LogicalPlan,
92        time_window_expr: Option<TimeWindowExpr>,
93        expire_after: Option<i64>,
94        sink_table_name: [String; 3],
95        source_table_names: Vec<[String; 3]>,
96        query_ctx: QueryContextRef,
97        catalog_manager: CatalogManagerRef,
98        shutdown_rx: oneshot::Receiver<()>,
99    ) -> Self {
100        Self {
101            config: Arc::new(TaskConfig {
102                flow_id,
103                query: query.to_string(),
104                plan: Arc::new(plan),
105                time_window_expr,
106                expire_after,
107                sink_table_name,
108                source_table_names: source_table_names.into_iter().collect(),
109                catalog_manager,
110            }),
111            state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
112        }
113    }
114
115    /// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
116    ///
117    /// useful for flush_flow to flush dirty time windows range
118    pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
119        let now = SystemTime::now();
120        let now = Timestamp::new_second(
121            now.duration_since(UNIX_EPOCH)
122                .expect("Time went backwards")
123                .as_secs() as _,
124        );
125        let lower_bound = self
126            .config
127            .expire_after
128            .map(|e| now.sub_duration(Duration::from_secs(e as _)))
129            .transpose()
130            .map_err(BoxedError::new)
131            .context(ExternalSnafu)?
132            .unwrap_or(Timestamp::new_second(0));
133        debug!(
134            "Flow {} mark range ({:?}, {:?}) as dirty",
135            self.config.flow_id, lower_bound, now
136        );
137        self.state
138            .write()
139            .unwrap()
140            .dirty_time_windows
141            .add_window(lower_bound, Some(now));
142        Ok(())
143    }
144
145    /// Create sink table if not exists
146    pub async fn check_or_create_sink_table(
147        &self,
148        engine: &QueryEngineRef,
149        frontend_client: &Arc<FrontendClient>,
150    ) -> Result<Option<(u32, Duration)>, Error> {
151        if !self.is_table_exist(&self.config.sink_table_name).await? {
152            let create_table = self.gen_create_table_expr(engine.clone()).await?;
153            info!(
154                "Try creating sink table(if not exists) with expr: {:?}",
155                create_table
156            );
157            self.create_table(frontend_client, create_table).await?;
158            info!(
159                "Sink table {}(if not exists) created",
160                self.config.sink_table_name.join(".")
161            );
162        }
163
164        Ok(None)
165    }
166
167    async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
168        self.config
169            .catalog_manager
170            .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
171            .await
172            .map_err(BoxedError::new)
173            .context(ExternalSnafu)
174    }
175
176    pub async fn gen_exec_once(
177        &self,
178        engine: &QueryEngineRef,
179        frontend_client: &Arc<FrontendClient>,
180    ) -> Result<Option<(u32, Duration)>, Error> {
181        if let Some(new_query) = self.gen_insert_plan(engine).await? {
182            debug!("Generate new query: {}", new_query);
183            self.execute_logical_plan(frontend_client, &new_query).await
184        } else {
185            debug!("Generate no query");
186            Ok(None)
187        }
188    }
189
190    pub async fn gen_insert_plan(
191        &self,
192        engine: &QueryEngineRef,
193    ) -> Result<Option<LogicalPlan>, Error> {
194        let (table, df_schema) = get_table_info_df_schema(
195            self.config.catalog_manager.clone(),
196            self.config.sink_table_name.clone(),
197        )
198        .await?;
199
200        let new_query = self
201            .gen_query_with_time_window(engine.clone(), &table.meta.schema)
202            .await?;
203
204        let insert_into = if let Some((new_query, _column_cnt)) = new_query {
205            // first check if all columns in input query exists in sink table
206            // since insert into ref to names in record batch generate by given query
207            let table_columns = df_schema
208                .columns()
209                .into_iter()
210                .map(|c| c.name)
211                .collect::<BTreeSet<_>>();
212            for column in new_query.schema().columns() {
213                ensure!(
214                    table_columns.contains(column.name()),
215                    InvalidQuerySnafu {
216                        reason: format!(
217                            "Column {} not found in sink table with columns {:?}",
218                            column, table_columns
219                        ),
220                    }
221                );
222            }
223            // update_at& time index placeholder (if exists) should have default value
224            LogicalPlan::Dml(DmlStatement::new(
225                datafusion_common::TableReference::Full {
226                    catalog: self.config.sink_table_name[0].clone().into(),
227                    schema: self.config.sink_table_name[1].clone().into(),
228                    table: self.config.sink_table_name[2].clone().into(),
229                },
230                df_schema,
231                WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
232                Arc::new(new_query),
233            ))
234        } else {
235            return Ok(None);
236        };
237        let insert_into = insert_into.recompute_schema().context(DatafusionSnafu {
238            context: "Failed to recompute schema",
239        })?;
240        Ok(Some(insert_into))
241    }
242
243    pub async fn create_table(
244        &self,
245        frontend_client: &Arc<FrontendClient>,
246        expr: CreateTableExpr,
247    ) -> Result<(), Error> {
248        let catalog = &self.config.sink_table_name[0];
249        let schema = &self.config.sink_table_name[1];
250        frontend_client
251            .create(expr.clone(), catalog, schema)
252            .await?;
253        Ok(())
254    }
255
256    pub async fn execute_logical_plan(
257        &self,
258        frontend_client: &Arc<FrontendClient>,
259        plan: &LogicalPlan,
260    ) -> Result<Option<(u32, Duration)>, Error> {
261        let instant = Instant::now();
262        let flow_id = self.config.flow_id;
263
264        debug!(
265            "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
266            self.config.expire_after, &plan
267        );
268
269        let catalog = &self.config.sink_table_name[0];
270        let schema = &self.config.sink_table_name[1];
271
272        // fix all table ref by make it fully qualified, i.e. "table_name" => "catalog_name.schema_name.table_name"
273        let fixed_plan = plan
274            .clone()
275            .transform_down_with_subqueries(|p| {
276                if let LogicalPlan::TableScan(mut table_scan) = p {
277                    let resolved = table_scan.table_name.resolve(catalog, schema);
278                    table_scan.table_name = resolved.into();
279                    Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
280                } else {
281                    Ok(Transformed::no(p))
282                }
283            })
284            .with_context(|_| DatafusionSnafu {
285                context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
286            })?
287            .data;
288
289        let expanded_plan = CountWildcardRule::new()
290            .analyze(fixed_plan.clone(), &Default::default())
291            .with_context(|_| DatafusionSnafu {
292                context: format!(
293                    "Failed to expand wildcard in logical plan, plan={:?}",
294                    fixed_plan
295                ),
296            })?;
297
298        let plan = expanded_plan;
299        let mut peer_desc = None;
300
301        let res = {
302            let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
303                .with_label_values(&[flow_id.to_string().as_str()])
304                .start_timer();
305
306            // hack and special handling the insert logical plan
307            let req = if let Some((insert_to, insert_plan)) =
308                breakup_insert_plan(&plan, catalog, schema)
309            {
310                let message = DFLogicalSubstraitConvertor {}
311                    .encode(&insert_plan, DefaultSerializer)
312                    .context(SubstraitEncodeLogicalPlanSnafu)?;
313                api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
314                    query: Some(api::v1::query_request::Query::InsertIntoPlan(
315                        api::v1::InsertIntoPlan {
316                            table_name: Some(insert_to),
317                            logical_plan: message.to_vec(),
318                        },
319                    )),
320                })
321            } else {
322                let message = DFLogicalSubstraitConvertor {}
323                    .encode(&plan, DefaultSerializer)
324                    .context(SubstraitEncodeLogicalPlanSnafu)?;
325
326                api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
327                    query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
328                })
329            };
330
331            frontend_client
332                .handle(req, catalog, schema, &mut peer_desc)
333                .await
334        };
335
336        let elapsed = instant.elapsed();
337        if let Ok(affected_rows) = &res {
338            debug!(
339                "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
340                elapsed
341            );
342        } else if let Err(err) = &res {
343            warn!(
344                "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
345                peer_desc, elapsed, &plan
346            );
347        }
348
349        // record slow query
350        if elapsed >= SLOW_QUERY_THRESHOLD {
351            warn!(
352                "Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}",
353                peer_desc, elapsed, &plan
354            );
355            METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
356                .with_label_values(&[
357                    flow_id.to_string().as_str(),
358                    &plan.to_string(),
359                    &peer_desc.unwrap_or_default().to_string(),
360                ])
361                .observe(elapsed.as_secs_f64());
362        }
363
364        self.state
365            .write()
366            .unwrap()
367            .after_query_exec(elapsed, res.is_ok());
368
369        let res = res?;
370
371        Ok(Some((res, elapsed)))
372    }
373
374    /// start executing query in a loop, break when receive shutdown signal
375    ///
376    /// any error will be logged when executing query
377    pub async fn start_executing_loop(
378        &self,
379        engine: QueryEngineRef,
380        frontend_client: Arc<FrontendClient>,
381    ) {
382        loop {
383            let mut new_query = None;
384            let mut gen_and_exec = async || {
385                new_query = self.gen_insert_plan(&engine).await?;
386                if let Some(new_query) = &new_query {
387                    self.execute_logical_plan(&frontend_client, new_query).await
388                } else {
389                    Ok(None)
390                }
391            };
392            match gen_and_exec().await {
393                // normal execute, sleep for some time before doing next query
394                Ok(Some(_)) => {
395                    let sleep_until = {
396                        let mut state = self.state.write().unwrap();
397                        match state.shutdown_rx.try_recv() {
398                            Ok(()) => break,
399                            Err(TryRecvError::Closed) => {
400                                warn!(
401                                    "Unexpected shutdown flow {}, shutdown anyway",
402                                    self.config.flow_id
403                                );
404                                break;
405                            }
406                            Err(TryRecvError::Empty) => (),
407                        }
408                        state.get_next_start_query_time(
409                            self.config.flow_id,
410                            Some(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT),
411                        )
412                    };
413                    tokio::time::sleep_until(sleep_until).await;
414                }
415                // no new data, sleep for some time before checking for new data
416                Ok(None) => {
417                    debug!(
418                        "Flow id = {:?} found no new data, sleep for {:?} then continue",
419                        self.config.flow_id, MIN_REFRESH_DURATION
420                    );
421                    tokio::time::sleep(MIN_REFRESH_DURATION).await;
422                    continue;
423                }
424                // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
425                Err(err) => {
426                    match new_query {
427                        Some(query) => {
428                            common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
429                        }
430                        None => {
431                            common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
432                        }
433                    }
434                    // also sleep for a little while before try again to prevent flooding logs
435                    tokio::time::sleep(MIN_REFRESH_DURATION).await;
436                }
437            }
438        }
439    }
440
441    /// Generate the create table SQL
442    ///
443    /// the auto created table will automatically added a `update_at` Milliseconds DEFAULT now() column in the end
444    /// (for compatibility with flow streaming mode)
445    ///
446    /// and it will use first timestamp column as time index, all other columns will be added as normal columns and nullable
447    async fn gen_create_table_expr(
448        &self,
449        engine: QueryEngineRef,
450    ) -> Result<CreateTableExpr, Error> {
451        let query_ctx = self.state.read().unwrap().query_ctx.clone();
452        let plan =
453            sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
454        create_table_with_expr(&plan, &self.config.sink_table_name)
455    }
456
457    /// will merge and use the first ten time window in query
458    async fn gen_query_with_time_window(
459        &self,
460        engine: QueryEngineRef,
461        sink_table_schema: &Arc<Schema>,
462    ) -> Result<Option<(LogicalPlan, usize)>, Error> {
463        let query_ctx = self.state.read().unwrap().query_ctx.clone();
464        let start = SystemTime::now();
465        let since_the_epoch = start
466            .duration_since(UNIX_EPOCH)
467            .expect("Time went backwards");
468        let low_bound = self
469            .config
470            .expire_after
471            .map(|e| since_the_epoch.as_secs() - e as u64)
472            .unwrap_or(u64::MIN);
473
474        let low_bound = Timestamp::new_second(low_bound as i64);
475        let schema_len = self.config.plan.schema().fields().len();
476
477        let expire_time_window_bound = self
478            .config
479            .time_window_expr
480            .as_ref()
481            .map(|expr| expr.eval(low_bound))
482            .transpose()?;
483
484        let new_plan = {
485            let expr = {
486                match expire_time_window_bound {
487                    Some((Some(l), Some(u))) => {
488                        let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
489                            reason: format!("Can't get window size from {u:?} - {l:?}"),
490                        })?;
491                        let col_name = self
492                            .config
493                            .time_window_expr
494                            .as_ref()
495                            .map(|expr| expr.column_name.clone())
496                            .with_context(|| UnexpectedSnafu {
497                                reason: format!(
498                                    "Flow id={:?}, Failed to get column name from time window expr",
499                                    self.config.flow_id
500                                ),
501                            })?;
502
503                        self.state
504                            .write()
505                            .unwrap()
506                            .dirty_time_windows
507                            .gen_filter_exprs(
508                                &col_name,
509                                Some(l),
510                                window_size,
511                                self.config.flow_id,
512                                Some(self),
513                            )?
514                    }
515                    _ => {
516                        // use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
517                        debug!(
518                            "Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id
519                        );
520                        // clean dirty time window too, this could be from create flow's check_execute
521                        self.state.write().unwrap().dirty_time_windows.clean();
522
523                        let mut add_auto_column =
524                            AddAutoColumnRewriter::new(sink_table_schema.clone());
525                        let plan = self
526                            .config
527                            .plan
528                            .deref()
529                            .clone()
530                            .rewrite(&mut add_auto_column)
531                            .with_context(|_| DatafusionSnafu {
532                                context: format!(
533                                    "Failed to rewrite plan:\n {}\n",
534                                    self.config.plan
535                                ),
536                            })?
537                            .data;
538                        let schema_len = plan.schema().fields().len();
539
540                        // since no time window lower/upper bound is found, just return the original query(with auto columns)
541                        return Ok(Some((plan, schema_len)));
542                    }
543                }
544            };
545
546            debug!(
547                "Flow id={:?}, Generated filter expr: {:?}",
548                self.config.flow_id,
549                expr.as_ref()
550                    .map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu {
551                        context: format!("Failed to generate filter expr from {expr:?}"),
552                    }))
553                    .transpose()?
554                    .map(|s| s.to_string())
555            );
556
557            let Some(expr) = expr else {
558                // no new data, hence no need to update
559                debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
560                return Ok(None);
561            };
562
563            // TODO(discord9): add auto column or not? This might break compatibility for auto created sink table before this, but that's ok right?
564
565            let mut add_filter = AddFilterRewriter::new(expr);
566            let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
567
568            let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false)
569                .await?;
570            let rewrite = plan
571                .clone()
572                .rewrite(&mut add_filter)
573                .and_then(|p| p.data.rewrite(&mut add_auto_column))
574                .with_context(|_| DatafusionSnafu {
575                    context: format!("Failed to rewrite plan:\n {}\n", plan),
576                })?
577                .data;
578            // only apply optimize after complex rewrite is done
579            apply_df_optimizer(rewrite).await?
580        };
581
582        Ok(Some((new_plan, schema_len)))
583    }
584}
585
586// auto created table have a auto added column `update_at`, and optional have a `AUTO_CREATED_PLACEHOLDER_TS_COL` column for time index placeholder if no timestamp column is specified
587// TODO(discord9): for now no default value is set for auto added column for compatibility reason with streaming mode, but this might change in favor of simpler code?
588fn create_table_with_expr(
589    plan: &LogicalPlan,
590    sink_table_name: &[String; 3],
591) -> Result<CreateTableExpr, Error> {
592    let fields = plan.schema().fields();
593    let (first_time_stamp, primary_keys) = build_primary_key_constraint(plan, fields)?;
594
595    let mut column_schemas = Vec::new();
596    for field in fields {
597        let name = field.name();
598        let ty = ConcreteDataType::from_arrow_type(field.data_type());
599        let col_schema = if first_time_stamp == Some(name.clone()) {
600            ColumnSchema::new(name, ty, false).with_time_index(true)
601        } else {
602            ColumnSchema::new(name, ty, true)
603        };
604        column_schemas.push(col_schema);
605    }
606
607    let update_at_schema = ColumnSchema::new(
608        AUTO_CREATED_UPDATE_AT_TS_COL,
609        ConcreteDataType::timestamp_millisecond_datatype(),
610        true,
611    );
612    column_schemas.push(update_at_schema);
613
614    let time_index = if let Some(time_index) = first_time_stamp {
615        time_index
616    } else {
617        column_schemas.push(
618            ColumnSchema::new(
619                AUTO_CREATED_PLACEHOLDER_TS_COL,
620                ConcreteDataType::timestamp_millisecond_datatype(),
621                false,
622            )
623            .with_time_index(true),
624        );
625        AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
626    };
627
628    let column_defs =
629        column_schemas_to_defs(column_schemas, &primary_keys).context(ConvertColumnSchemaSnafu)?;
630    Ok(CreateTableExpr {
631        catalog_name: sink_table_name[0].clone(),
632        schema_name: sink_table_name[1].clone(),
633        table_name: sink_table_name[2].clone(),
634        desc: "Auto created table by flow engine".to_string(),
635        column_defs,
636        time_index,
637        primary_keys,
638        create_if_not_exists: true,
639        table_options: Default::default(),
640        table_id: None,
641        engine: "mito".to_string(),
642    })
643}
644
645/// Return first timestamp column which is in group by clause and other columns which are also in group by clause
646///
647/// # Returns
648///
649/// * `Option<String>` - first timestamp column which is in group by clause
650/// * `Vec<String>` - other columns which are also in group by clause
651fn build_primary_key_constraint(
652    plan: &LogicalPlan,
653    schema: &Fields,
654) -> Result<(Option<String>, Vec<String>), Error> {
655    let mut pk_names = FindGroupByFinalName::default();
656
657    plan.visit(&mut pk_names)
658        .with_context(|_| DatafusionSnafu {
659            context: format!("Can't find aggr expr in plan {plan:?}"),
660        })?;
661
662    // if no group by clause, return empty
663    let pk_final_names = pk_names.get_group_expr_names().unwrap_or_default();
664    if pk_final_names.is_empty() {
665        return Ok((None, Vec::new()));
666    }
667
668    let all_pk_cols: Vec<_> = schema
669        .iter()
670        .filter(|f| pk_final_names.contains(f.name()))
671        .map(|f| f.name().clone())
672        .collect();
673    // auto create table use first timestamp column in group by clause as time index
674    let first_time_stamp = schema
675        .iter()
676        .find(|f| {
677            all_pk_cols.contains(&f.name().clone())
678                && ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()
679        })
680        .map(|f| f.name().clone());
681
682    let all_pk_cols: Vec<_> = all_pk_cols
683        .into_iter()
684        .filter(|col| first_time_stamp != Some(col.to_string()))
685        .collect();
686
687    Ok((first_time_stamp, all_pk_cols))
688}
689
690#[cfg(test)]
691mod test {
692    use api::v1::column_def::try_as_column_schema;
693    use pretty_assertions::assert_eq;
694    use session::context::QueryContext;
695
696    use super::*;
697    use crate::test_utils::create_test_query_engine;
698
699    #[tokio::test]
700    async fn test_gen_create_table_sql() {
701        let query_engine = create_test_query_engine();
702        let ctx = QueryContext::arc();
703        struct TestCase {
704            sql: String,
705            sink_table_name: String,
706            column_schemas: Vec<ColumnSchema>,
707            primary_keys: Vec<String>,
708            time_index: String,
709        }
710
711        let update_at_schema = ColumnSchema::new(
712            AUTO_CREATED_UPDATE_AT_TS_COL,
713            ConcreteDataType::timestamp_millisecond_datatype(),
714            true,
715        );
716
717        let ts_placeholder_schema = ColumnSchema::new(
718            AUTO_CREATED_PLACEHOLDER_TS_COL,
719            ConcreteDataType::timestamp_millisecond_datatype(),
720            false,
721        )
722        .with_time_index(true);
723
724        let testcases = vec![
725            TestCase {
726                sql: "SELECT number, ts FROM numbers_with_ts".to_string(),
727                sink_table_name: "new_table".to_string(),
728                column_schemas: vec![
729                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
730                    ColumnSchema::new(
731                        "ts",
732                        ConcreteDataType::timestamp_millisecond_datatype(),
733                        true,
734                    ),
735                    update_at_schema.clone(),
736                    ts_placeholder_schema.clone(),
737                ],
738                primary_keys: vec![],
739                time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
740            },
741            TestCase {
742                sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(),
743                sink_table_name: "new_table".to_string(),
744                column_schemas: vec![
745                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
746                    ColumnSchema::new(
747                        "max(numbers_with_ts.ts)",
748                        ConcreteDataType::timestamp_millisecond_datatype(),
749                        true,
750                    ),
751                    update_at_schema.clone(),
752                    ts_placeholder_schema.clone(),
753                ],
754                primary_keys: vec!["number".to_string()],
755                time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
756            },
757            TestCase {
758                sql: "SELECT max(number), ts FROM numbers_with_ts GROUP BY ts".to_string(),
759                sink_table_name: "new_table".to_string(),
760                column_schemas: vec![
761                    ColumnSchema::new(
762                        "max(numbers_with_ts.number)",
763                        ConcreteDataType::uint32_datatype(),
764                        true,
765                    ),
766                    ColumnSchema::new(
767                        "ts",
768                        ConcreteDataType::timestamp_millisecond_datatype(),
769                        false,
770                    )
771                    .with_time_index(true),
772                    update_at_schema.clone(),
773                ],
774                primary_keys: vec![],
775                time_index: "ts".to_string(),
776            },
777            TestCase {
778                sql: "SELECT number, ts FROM numbers_with_ts GROUP BY ts, number".to_string(),
779                sink_table_name: "new_table".to_string(),
780                column_schemas: vec![
781                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
782                    ColumnSchema::new(
783                        "ts",
784                        ConcreteDataType::timestamp_millisecond_datatype(),
785                        false,
786                    )
787                    .with_time_index(true),
788                    update_at_schema.clone(),
789                ],
790                primary_keys: vec!["number".to_string()],
791                time_index: "ts".to_string(),
792            },
793        ];
794
795        for tc in testcases {
796            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &tc.sql, true)
797                .await
798                .unwrap();
799            let expr = create_table_with_expr(
800                &plan,
801                &[
802                    "greptime".to_string(),
803                    "public".to_string(),
804                    tc.sink_table_name.clone(),
805                ],
806            )
807            .unwrap();
808            // TODO(discord9): assert expr
809            let column_schemas = expr
810                .column_defs
811                .iter()
812                .map(|c| try_as_column_schema(c).unwrap())
813                .collect::<Vec<_>>();
814            assert_eq!(tc.column_schemas, column_schemas);
815            assert_eq!(tc.primary_keys, expr.primary_keys);
816            assert_eq!(tc.time_index, expr.time_index);
817        }
818    }
819}