flow/batching_mode/
task.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashSet};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18
19use api::v1::CreateTableExpr;
20use arrow_schema::Fields;
21use catalog::CatalogManagerRef;
22use common_error::ext::BoxedError;
23use common_query::logical_plan::breakup_insert_plan;
24use common_telemetry::tracing::warn;
25use common_telemetry::{debug, info};
26use common_time::Timestamp;
27use datafusion::datasource::DefaultTableSource;
28use datafusion::sql::unparser::expr_to_sql;
29use datafusion_common::tree_node::{Transformed, TreeNode};
30use datafusion_common::DFSchemaRef;
31use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
32use datatypes::prelude::ConcreteDataType;
33use datatypes::schema::{ColumnSchema, Schema};
34use operator::expr_helper::column_schemas_to_defs;
35use query::query_engine::DefaultSerializer;
36use query::QueryEngineRef;
37use session::context::QueryContextRef;
38use snafu::{ensure, OptionExt, ResultExt};
39use sql::parser::{ParseOptions, ParserContext};
40use sql::statements::statement::Statement;
41use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
42use table::table::adapter::DfTableProviderAdapter;
43use tokio::sync::oneshot;
44use tokio::sync::oneshot::error::TryRecvError;
45use tokio::time::Instant;
46
47use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
48use crate::batching_mode::frontend_client::FrontendClient;
49use crate::batching_mode::state::{FilterExprInfo, TaskState};
50use crate::batching_mode::time_window::TimeWindowExpr;
51use crate::batching_mode::utils::{
52    get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter,
53    FindGroupByFinalName,
54};
55use crate::batching_mode::BatchingModeOptions;
56use crate::df_optimizer::apply_df_optimizer;
57use crate::error::{
58    ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
59    SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
60};
61use crate::metrics::{
62    METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
63    METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
64    METRIC_FLOW_ROWS,
65};
66use crate::{Error, FlowId};
67
68/// The task's config, immutable once created
69#[derive(Clone)]
70pub struct TaskConfig {
71    pub flow_id: FlowId,
72    pub query: String,
73    /// output schema of the query
74    pub output_schema: DFSchemaRef,
75    pub time_window_expr: Option<TimeWindowExpr>,
76    /// in seconds
77    pub expire_after: Option<i64>,
78    sink_table_name: [String; 3],
79    pub source_table_names: HashSet<[String; 3]>,
80    catalog_manager: CatalogManagerRef,
81    query_type: QueryType,
82    batch_opts: Arc<BatchingModeOptions>,
83}
84
85fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
86    let stmts =
87        ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
88            .map_err(BoxedError::new)
89            .context(ExternalSnafu)?;
90
91    ensure!(
92        stmts.len() == 1,
93        InvalidQuerySnafu {
94            reason: format!("Expect only one statement, found {}", stmts.len())
95        }
96    );
97    let stmt = &stmts[0];
98    match stmt {
99        Statement::Tql(_) => Ok(QueryType::Tql),
100        _ => Ok(QueryType::Sql),
101    }
102}
103
104#[derive(Debug, Clone)]
105enum QueryType {
106    /// query is a tql query
107    Tql,
108    /// query is a sql query
109    Sql,
110}
111
112#[derive(Clone)]
113pub struct BatchingTask {
114    pub config: Arc<TaskConfig>,
115    pub state: Arc<RwLock<TaskState>>,
116}
117
118/// Arguments for creating batching task
119pub struct TaskArgs<'a> {
120    pub flow_id: FlowId,
121    pub query: &'a str,
122    pub plan: LogicalPlan,
123    pub time_window_expr: Option<TimeWindowExpr>,
124    pub expire_after: Option<i64>,
125    pub sink_table_name: [String; 3],
126    pub source_table_names: Vec<[String; 3]>,
127    pub query_ctx: QueryContextRef,
128    pub catalog_manager: CatalogManagerRef,
129    pub shutdown_rx: oneshot::Receiver<()>,
130    pub batch_opts: Arc<BatchingModeOptions>,
131}
132
133pub struct PlanInfo {
134    pub plan: LogicalPlan,
135    pub filter: Option<FilterExprInfo>,
136}
137
138impl BatchingTask {
139    #[allow(clippy::too_many_arguments)]
140    pub fn try_new(
141        TaskArgs {
142            flow_id,
143            query,
144            plan,
145            time_window_expr,
146            expire_after,
147            sink_table_name,
148            source_table_names,
149            query_ctx,
150            catalog_manager,
151            shutdown_rx,
152            batch_opts,
153        }: TaskArgs<'_>,
154    ) -> Result<Self, Error> {
155        Ok(Self {
156            config: Arc::new(TaskConfig {
157                flow_id,
158                query: query.to_string(),
159                time_window_expr,
160                expire_after,
161                sink_table_name,
162                source_table_names: source_table_names.into_iter().collect(),
163                catalog_manager,
164                output_schema: plan.schema().clone(),
165                query_type: determine_query_type(query, &query_ctx)?,
166                batch_opts,
167            }),
168            state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
169        })
170    }
171
172    /// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
173    ///
174    /// useful for flush_flow to flush dirty time windows range
175    pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
176        let now = SystemTime::now();
177        let now = Timestamp::new_second(
178            now.duration_since(UNIX_EPOCH)
179                .expect("Time went backwards")
180                .as_secs() as _,
181        );
182        let lower_bound = self
183            .config
184            .expire_after
185            .map(|e| now.sub_duration(Duration::from_secs(e as _)))
186            .transpose()
187            .map_err(BoxedError::new)
188            .context(ExternalSnafu)?
189            .unwrap_or(Timestamp::new_second(0));
190        debug!(
191            "Flow {} mark range ({:?}, {:?}) as dirty",
192            self.config.flow_id, lower_bound, now
193        );
194        self.state
195            .write()
196            .unwrap()
197            .dirty_time_windows
198            .add_window(lower_bound, Some(now));
199        Ok(())
200    }
201
202    /// Create sink table if not exists
203    pub async fn check_or_create_sink_table(
204        &self,
205        engine: &QueryEngineRef,
206        frontend_client: &Arc<FrontendClient>,
207    ) -> Result<Option<(u32, Duration)>, Error> {
208        if !self.is_table_exist(&self.config.sink_table_name).await? {
209            let create_table = self.gen_create_table_expr(engine.clone()).await?;
210            info!(
211                "Try creating sink table(if not exists) with expr: {:?}",
212                create_table
213            );
214            self.create_table(frontend_client, create_table).await?;
215            info!(
216                "Sink table {}(if not exists) created",
217                self.config.sink_table_name.join(".")
218            );
219        }
220
221        Ok(None)
222    }
223
224    async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
225        self.config
226            .catalog_manager
227            .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
228            .await
229            .map_err(BoxedError::new)
230            .context(ExternalSnafu)
231    }
232
233    pub async fn gen_exec_once(
234        &self,
235        engine: &QueryEngineRef,
236        frontend_client: &Arc<FrontendClient>,
237        max_window_cnt: Option<usize>,
238    ) -> Result<Option<(u32, Duration)>, Error> {
239        if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? {
240            debug!("Generate new query: {}", new_query.plan);
241            self.execute_logical_plan(frontend_client, &new_query.plan)
242                .await
243        } else {
244            debug!("Generate no query");
245            Ok(None)
246        }
247    }
248
249    pub async fn gen_insert_plan(
250        &self,
251        engine: &QueryEngineRef,
252        max_window_cnt: Option<usize>,
253    ) -> Result<Option<PlanInfo>, Error> {
254        let (table, df_schema) = get_table_info_df_schema(
255            self.config.catalog_manager.clone(),
256            self.config.sink_table_name.clone(),
257        )
258        .await?;
259
260        let new_query = self
261            .gen_query_with_time_window(
262                engine.clone(),
263                &table.table_info().meta.schema,
264                max_window_cnt,
265            )
266            .await?;
267
268        let insert_into_info = if let Some(new_query) = new_query {
269            // first check if all columns in input query exists in sink table
270            // since insert into ref to names in record batch generate by given query
271            let table_columns = df_schema
272                .columns()
273                .into_iter()
274                .map(|c| c.name)
275                .collect::<BTreeSet<_>>();
276            for column in new_query.plan.schema().columns() {
277                ensure!(
278                    table_columns.contains(column.name()),
279                    InvalidQuerySnafu {
280                        reason: format!(
281                            "Column {} not found in sink table with columns {:?}",
282                            column, table_columns
283                        ),
284                    }
285                );
286            }
287
288            let table_provider = Arc::new(DfTableProviderAdapter::new(table));
289            let table_source = Arc::new(DefaultTableSource::new(table_provider));
290
291            // update_at& time index placeholder (if exists) should have default value
292            let plan = LogicalPlan::Dml(DmlStatement::new(
293                datafusion_common::TableReference::Full {
294                    catalog: self.config.sink_table_name[0].clone().into(),
295                    schema: self.config.sink_table_name[1].clone().into(),
296                    table: self.config.sink_table_name[2].clone().into(),
297                },
298                table_source,
299                WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
300                Arc::new(new_query.plan),
301            ));
302            PlanInfo {
303                plan,
304                filter: new_query.filter,
305            }
306        } else {
307            return Ok(None);
308        };
309        let insert_into = insert_into_info
310            .plan
311            .recompute_schema()
312            .context(DatafusionSnafu {
313                context: "Failed to recompute schema",
314            })?;
315
316        Ok(Some(PlanInfo {
317            plan: insert_into,
318            filter: insert_into_info.filter,
319        }))
320    }
321
322    pub async fn create_table(
323        &self,
324        frontend_client: &Arc<FrontendClient>,
325        expr: CreateTableExpr,
326    ) -> Result<(), Error> {
327        let catalog = &self.config.sink_table_name[0];
328        let schema = &self.config.sink_table_name[1];
329        frontend_client
330            .create(expr.clone(), catalog, schema)
331            .await?;
332        Ok(())
333    }
334
335    pub async fn execute_logical_plan(
336        &self,
337        frontend_client: &Arc<FrontendClient>,
338        plan: &LogicalPlan,
339    ) -> Result<Option<(u32, Duration)>, Error> {
340        let instant = Instant::now();
341        let flow_id = self.config.flow_id;
342
343        debug!(
344            "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
345            self.config.expire_after, &plan
346        );
347
348        let catalog = &self.config.sink_table_name[0];
349        let schema = &self.config.sink_table_name[1];
350
351        // fix all table ref by make it fully qualified, i.e. "table_name" => "catalog_name.schema_name.table_name"
352        let plan = plan
353            .clone()
354            .transform_down_with_subqueries(|p| {
355                if let LogicalPlan::TableScan(mut table_scan) = p {
356                    let resolved = table_scan.table_name.resolve(catalog, schema);
357                    table_scan.table_name = resolved.into();
358                    Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
359                } else {
360                    Ok(Transformed::no(p))
361                }
362            })
363            .with_context(|_| DatafusionSnafu {
364                context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
365            })?
366            .data;
367
368        let mut peer_desc = None;
369
370        let res = {
371            let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
372                .with_label_values(&[flow_id.to_string().as_str()])
373                .start_timer();
374
375            // hack and special handling the insert logical plan
376            let req = if let Some((insert_to, insert_plan)) =
377                breakup_insert_plan(&plan, catalog, schema)
378            {
379                let message = DFLogicalSubstraitConvertor {}
380                    .encode(&insert_plan, DefaultSerializer)
381                    .context(SubstraitEncodeLogicalPlanSnafu)?;
382                api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
383                    query: Some(api::v1::query_request::Query::InsertIntoPlan(
384                        api::v1::InsertIntoPlan {
385                            table_name: Some(insert_to),
386                            logical_plan: message.to_vec(),
387                        },
388                    )),
389                })
390            } else {
391                let message = DFLogicalSubstraitConvertor {}
392                    .encode(&plan, DefaultSerializer)
393                    .context(SubstraitEncodeLogicalPlanSnafu)?;
394
395                api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
396                    query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
397                })
398            };
399
400            frontend_client
401                .handle(req, catalog, schema, &mut peer_desc)
402                .await
403        };
404
405        let elapsed = instant.elapsed();
406        if let Ok(affected_rows) = &res {
407            debug!(
408                "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
409                elapsed
410            );
411            METRIC_FLOW_ROWS
412                .with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
413                .inc_by(*affected_rows as _);
414        } else if let Err(err) = &res {
415            warn!(
416                "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
417                peer_desc, elapsed, &plan
418            );
419        }
420
421        // record slow query
422        if elapsed >= self.config.batch_opts.slow_query_threshold {
423            warn!(
424                "Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}",
425                peer_desc, elapsed, &plan
426            );
427            METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
428                .with_label_values(&[
429                    flow_id.to_string().as_str(),
430                    &peer_desc.unwrap_or_default().to_string(),
431                ])
432                .observe(elapsed.as_secs_f64());
433        }
434
435        self.state
436            .write()
437            .unwrap()
438            .after_query_exec(elapsed, res.is_ok());
439
440        let res = res?;
441
442        Ok(Some((res, elapsed)))
443    }
444
445    /// start executing query in a loop, break when receive shutdown signal
446    ///
447    /// any error will be logged when executing query
448    pub async fn start_executing_loop(
449        &self,
450        engine: QueryEngineRef,
451        frontend_client: Arc<FrontendClient>,
452    ) {
453        let flow_id_str = self.config.flow_id.to_string();
454        let mut max_window_cnt = None;
455        loop {
456            // first check if shutdown signal is received
457            // if so, break the loop
458            {
459                let mut state = self.state.write().unwrap();
460                match state.shutdown_rx.try_recv() {
461                    Ok(()) => break,
462                    Err(TryRecvError::Closed) => {
463                        warn!(
464                            "Unexpected shutdown flow {}, shutdown anyway",
465                            self.config.flow_id
466                        );
467                        break;
468                    }
469                    Err(TryRecvError::Empty) => (),
470                }
471            }
472            METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
473                .with_label_values(&[&flow_id_str])
474                .inc();
475
476            let min_refresh = self.config.batch_opts.experimental_min_refresh_duration;
477
478            let new_query = match self.gen_insert_plan(&engine, max_window_cnt).await {
479                Ok(new_query) => new_query,
480                Err(err) => {
481                    common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
482                    // also sleep for a little while before try again to prevent flooding logs
483                    tokio::time::sleep(min_refresh).await;
484                    continue;
485                }
486            };
487
488            let res = if let Some(new_query) = &new_query {
489                self.execute_logical_plan(&frontend_client, &new_query.plan)
490                    .await
491            } else {
492                Ok(None)
493            };
494
495            match res {
496                // normal execute, sleep for some time before doing next query
497                Ok(Some(_)) => {
498                    // can increase max_window_cnt to query more windows next time
499                    max_window_cnt = max_window_cnt.map(|cnt| {
500                        (cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query)
501                    });
502                    let sleep_until = {
503                        let state = self.state.write().unwrap();
504
505                        let time_window_size = self
506                            .config
507                            .time_window_expr
508                            .as_ref()
509                            .and_then(|t| *t.time_window_size());
510
511                        state.get_next_start_query_time(
512                            self.config.flow_id,
513                            &time_window_size,
514                            min_refresh,
515                            Some(self.config.batch_opts.query_timeout),
516                            self.config.batch_opts.experimental_max_filter_num_per_query,
517                        )
518                    };
519                    tokio::time::sleep_until(sleep_until).await;
520                }
521                // no new data, sleep for some time before checking for new data
522                Ok(None) => {
523                    debug!(
524                        "Flow id = {:?} found no new data, sleep for {:?} then continue",
525                        self.config.flow_id, min_refresh
526                    );
527                    tokio::time::sleep(min_refresh).await;
528                    continue;
529                }
530                // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
531                Err(err) => {
532                    METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
533                        .with_label_values(&[&flow_id_str])
534                        .inc();
535                    match new_query {
536                        Some(query) => {
537                            common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
538                            // Re-add dirty windows back since query failed
539                            self.state.write().unwrap().dirty_time_windows.add_windows(
540                                query.filter.map(|f| f.time_ranges).unwrap_or_default(),
541                            );
542                            // TODO(discord9): add some backoff here? half the query time window or what
543                            // backoff meaning use smaller `max_window_cnt` for next query
544
545                            // since last query failed, we should not try to query too many windows
546                            max_window_cnt = Some(1);
547                        }
548                        None => {
549                            common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
550                        }
551                    }
552                    // also sleep for a little while before try again to prevent flooding logs
553                    tokio::time::sleep(min_refresh).await;
554                }
555            }
556        }
557    }
558
559    /// Generate the create table SQL
560    ///
561    /// the auto created table will automatically added a `update_at` Milliseconds DEFAULT now() column in the end
562    /// (for compatibility with flow streaming mode)
563    ///
564    /// and it will use first timestamp column as time index, all other columns will be added as normal columns and nullable
565    async fn gen_create_table_expr(
566        &self,
567        engine: QueryEngineRef,
568    ) -> Result<CreateTableExpr, Error> {
569        let query_ctx = self.state.read().unwrap().query_ctx.clone();
570        let plan =
571            sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
572        create_table_with_expr(&plan, &self.config.sink_table_name)
573    }
574
575    /// will merge and use the first ten time window in query
576    async fn gen_query_with_time_window(
577        &self,
578        engine: QueryEngineRef,
579        sink_table_schema: &Arc<Schema>,
580        max_window_cnt: Option<usize>,
581    ) -> Result<Option<PlanInfo>, Error> {
582        let query_ctx = self.state.read().unwrap().query_ctx.clone();
583        let start = SystemTime::now();
584        let since_the_epoch = start
585            .duration_since(UNIX_EPOCH)
586            .expect("Time went backwards");
587        let low_bound = self
588            .config
589            .expire_after
590            .map(|e| since_the_epoch.as_secs() - e as u64)
591            .unwrap_or(u64::MIN);
592
593        let low_bound = Timestamp::new_second(low_bound as i64);
594
595        let expire_time_window_bound = self
596            .config
597            .time_window_expr
598            .as_ref()
599            .map(|expr| expr.eval(low_bound))
600            .transpose()?;
601
602        let (Some((Some(l), Some(u))), QueryType::Sql) =
603            (expire_time_window_bound, &self.config.query_type)
604        else {
605            // either no time window or not a sql query, then just use the original query
606            // use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
607            debug!(
608                "Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id
609            );
610            // clean dirty time window too, this could be from create flow's check_execute
611            self.state.write().unwrap().dirty_time_windows.clean();
612
613            // TODO(discord9): not add auto column for tql query?
614            let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
615
616            let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false)
617                .await?;
618
619            let plan = plan
620                .clone()
621                .rewrite(&mut add_auto_column)
622                .with_context(|_| DatafusionSnafu {
623                    context: format!("Failed to rewrite plan:\n {}\n", plan),
624                })?
625                .data;
626
627            // since no time window lower/upper bound is found, just return the original query(with auto columns)
628            return Ok(Some(PlanInfo { plan, filter: None }));
629        };
630
631        debug!(
632            "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
633            self.config.flow_id, l, u, self.state.read().unwrap().dirty_time_windows
634        );
635        let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
636            reason: format!("Can't get window size from {u:?} - {l:?}"),
637        })?;
638        let col_name = self
639            .config
640            .time_window_expr
641            .as_ref()
642            .map(|expr| expr.column_name.clone())
643            .with_context(|| UnexpectedSnafu {
644                reason: format!(
645                    "Flow id={:?}, Failed to get column name from time window expr",
646                    self.config.flow_id
647                ),
648            })?;
649
650        let expr = self
651            .state
652            .write()
653            .unwrap()
654            .dirty_time_windows
655            .gen_filter_exprs(
656                &col_name,
657                Some(l),
658                window_size,
659                max_window_cnt
660                    .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query),
661                self.config.flow_id,
662                Some(self),
663            )?;
664
665        debug!(
666            "Flow id={:?}, Generated filter expr: {:?}",
667            self.config.flow_id,
668            expr.as_ref()
669                .map(
670                    |expr| expr_to_sql(&expr.expr).with_context(|_| DatafusionSnafu {
671                        context: format!("Failed to generate filter expr from {expr:?}"),
672                    })
673                )
674                .transpose()?
675                .map(|s| s.to_string())
676        );
677
678        let Some(expr) = expr else {
679            // no new data, hence no need to update
680            debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
681            return Ok(None);
682        };
683
684        let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
685        let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
686
687        let plan =
688            sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;
689        let rewrite = plan
690            .clone()
691            .rewrite(&mut add_filter)
692            .and_then(|p| p.data.rewrite(&mut add_auto_column))
693            .with_context(|_| DatafusionSnafu {
694                context: format!("Failed to rewrite plan:\n {}\n", plan),
695            })?
696            .data;
697        // only apply optimize after complex rewrite is done
698        let new_plan = apply_df_optimizer(rewrite).await?;
699
700        let info = PlanInfo {
701            plan: new_plan.clone(),
702            filter: Some(expr),
703        };
704
705        Ok(Some(info))
706    }
707}
708
709// auto created table have a auto added column `update_at`, and optional have a `AUTO_CREATED_PLACEHOLDER_TS_COL` column for time index placeholder if no timestamp column is specified
710// TODO(discord9): for now no default value is set for auto added column for compatibility reason with streaming mode, but this might change in favor of simpler code?
711fn create_table_with_expr(
712    plan: &LogicalPlan,
713    sink_table_name: &[String; 3],
714) -> Result<CreateTableExpr, Error> {
715    let fields = plan.schema().fields();
716    let (first_time_stamp, primary_keys) = build_primary_key_constraint(plan, fields)?;
717
718    let mut column_schemas = Vec::new();
719    for field in fields {
720        let name = field.name();
721        let ty = ConcreteDataType::from_arrow_type(field.data_type());
722        let col_schema = if first_time_stamp == Some(name.clone()) {
723            ColumnSchema::new(name, ty, false).with_time_index(true)
724        } else {
725            ColumnSchema::new(name, ty, true)
726        };
727        column_schemas.push(col_schema);
728    }
729
730    let update_at_schema = ColumnSchema::new(
731        AUTO_CREATED_UPDATE_AT_TS_COL,
732        ConcreteDataType::timestamp_millisecond_datatype(),
733        true,
734    );
735    column_schemas.push(update_at_schema);
736
737    let time_index = if let Some(time_index) = first_time_stamp {
738        time_index
739    } else {
740        column_schemas.push(
741            ColumnSchema::new(
742                AUTO_CREATED_PLACEHOLDER_TS_COL,
743                ConcreteDataType::timestamp_millisecond_datatype(),
744                false,
745            )
746            .with_time_index(true),
747        );
748        AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
749    };
750
751    let column_defs =
752        column_schemas_to_defs(column_schemas, &primary_keys).context(ConvertColumnSchemaSnafu)?;
753    Ok(CreateTableExpr {
754        catalog_name: sink_table_name[0].clone(),
755        schema_name: sink_table_name[1].clone(),
756        table_name: sink_table_name[2].clone(),
757        desc: "Auto created table by flow engine".to_string(),
758        column_defs,
759        time_index,
760        primary_keys,
761        create_if_not_exists: true,
762        table_options: Default::default(),
763        table_id: None,
764        engine: "mito".to_string(),
765    })
766}
767
768/// Return first timestamp column which is in group by clause and other columns which are also in group by clause
769///
770/// # Returns
771///
772/// * `Option<String>` - first timestamp column which is in group by clause
773/// * `Vec<String>` - other columns which are also in group by clause
774fn build_primary_key_constraint(
775    plan: &LogicalPlan,
776    schema: &Fields,
777) -> Result<(Option<String>, Vec<String>), Error> {
778    let mut pk_names = FindGroupByFinalName::default();
779
780    plan.visit(&mut pk_names)
781        .with_context(|_| DatafusionSnafu {
782            context: format!("Can't find aggr expr in plan {plan:?}"),
783        })?;
784
785    // if no group by clause, return empty
786    let pk_final_names = pk_names.get_group_expr_names().unwrap_or_default();
787    if pk_final_names.is_empty() {
788        return Ok((None, Vec::new()));
789    }
790
791    let all_pk_cols: Vec<_> = schema
792        .iter()
793        .filter(|f| pk_final_names.contains(f.name()))
794        .map(|f| f.name().clone())
795        .collect();
796    // auto create table use first timestamp column in group by clause as time index
797    let first_time_stamp = schema
798        .iter()
799        .find(|f| {
800            all_pk_cols.contains(&f.name().clone())
801                && ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()
802        })
803        .map(|f| f.name().clone());
804
805    let all_pk_cols: Vec<_> = all_pk_cols
806        .into_iter()
807        .filter(|col| first_time_stamp != Some(col.to_string()))
808        .collect();
809
810    Ok((first_time_stamp, all_pk_cols))
811}
812
813#[cfg(test)]
814mod test {
815    use api::v1::column_def::try_as_column_schema;
816    use pretty_assertions::assert_eq;
817    use session::context::QueryContext;
818
819    use super::*;
820    use crate::test_utils::create_test_query_engine;
821
822    #[tokio::test]
823    async fn test_gen_create_table_sql() {
824        let query_engine = create_test_query_engine();
825        let ctx = QueryContext::arc();
826        struct TestCase {
827            sql: String,
828            sink_table_name: String,
829            column_schemas: Vec<ColumnSchema>,
830            primary_keys: Vec<String>,
831            time_index: String,
832        }
833
834        let update_at_schema = ColumnSchema::new(
835            AUTO_CREATED_UPDATE_AT_TS_COL,
836            ConcreteDataType::timestamp_millisecond_datatype(),
837            true,
838        );
839
840        let ts_placeholder_schema = ColumnSchema::new(
841            AUTO_CREATED_PLACEHOLDER_TS_COL,
842            ConcreteDataType::timestamp_millisecond_datatype(),
843            false,
844        )
845        .with_time_index(true);
846
847        let testcases = vec![
848            TestCase {
849                sql: "SELECT number, ts FROM numbers_with_ts".to_string(),
850                sink_table_name: "new_table".to_string(),
851                column_schemas: vec![
852                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
853                    ColumnSchema::new(
854                        "ts",
855                        ConcreteDataType::timestamp_millisecond_datatype(),
856                        true,
857                    ),
858                    update_at_schema.clone(),
859                    ts_placeholder_schema.clone(),
860                ],
861                primary_keys: vec![],
862                time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
863            },
864            TestCase {
865                sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(),
866                sink_table_name: "new_table".to_string(),
867                column_schemas: vec![
868                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
869                    ColumnSchema::new(
870                        "max(numbers_with_ts.ts)",
871                        ConcreteDataType::timestamp_millisecond_datatype(),
872                        true,
873                    ),
874                    update_at_schema.clone(),
875                    ts_placeholder_schema.clone(),
876                ],
877                primary_keys: vec!["number".to_string()],
878                time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
879            },
880            TestCase {
881                sql: "SELECT max(number), ts FROM numbers_with_ts GROUP BY ts".to_string(),
882                sink_table_name: "new_table".to_string(),
883                column_schemas: vec![
884                    ColumnSchema::new(
885                        "max(numbers_with_ts.number)",
886                        ConcreteDataType::uint32_datatype(),
887                        true,
888                    ),
889                    ColumnSchema::new(
890                        "ts",
891                        ConcreteDataType::timestamp_millisecond_datatype(),
892                        false,
893                    )
894                    .with_time_index(true),
895                    update_at_schema.clone(),
896                ],
897                primary_keys: vec![],
898                time_index: "ts".to_string(),
899            },
900            TestCase {
901                sql: "SELECT number, ts FROM numbers_with_ts GROUP BY ts, number".to_string(),
902                sink_table_name: "new_table".to_string(),
903                column_schemas: vec![
904                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
905                    ColumnSchema::new(
906                        "ts",
907                        ConcreteDataType::timestamp_millisecond_datatype(),
908                        false,
909                    )
910                    .with_time_index(true),
911                    update_at_schema.clone(),
912                ],
913                primary_keys: vec!["number".to_string()],
914                time_index: "ts".to_string(),
915            },
916        ];
917
918        for tc in testcases {
919            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &tc.sql, true)
920                .await
921                .unwrap();
922            let expr = create_table_with_expr(
923                &plan,
924                &[
925                    "greptime".to_string(),
926                    "public".to_string(),
927                    tc.sink_table_name.clone(),
928                ],
929            )
930            .unwrap();
931            // TODO(discord9): assert expr
932            let column_schemas = expr
933                .column_defs
934                .iter()
935                .map(|c| try_as_column_schema(c).unwrap())
936                .collect::<Vec<_>>();
937            assert_eq!(tc.column_schemas, column_schemas);
938            assert_eq!(tc.primary_keys, expr.primary_keys);
939            assert_eq!(tc.time_index, expr.time_index);
940        }
941    }
942}