flow/batching_mode/
task.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashSet};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18
19use api::v1::CreateTableExpr;
20use catalog::CatalogManagerRef;
21use common_error::ext::BoxedError;
22use common_query::logical_plan::breakup_insert_plan;
23use common_telemetry::tracing::warn;
24use common_telemetry::{debug, info};
25use common_time::Timestamp;
26use datafusion::datasource::DefaultTableSource;
27use datafusion::sql::unparser::expr_to_sql;
28use datafusion_common::DFSchemaRef;
29use datafusion_common::tree_node::{Transformed, TreeNode};
30use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
31use datatypes::prelude::ConcreteDataType;
32use datatypes::schema::{ColumnSchema, Schema};
33use operator::expr_helper::column_schemas_to_defs;
34use query::QueryEngineRef;
35use query::query_engine::DefaultSerializer;
36use session::context::QueryContextRef;
37use snafu::{OptionExt, ResultExt, ensure};
38use sql::parser::{ParseOptions, ParserContext};
39use sql::statements::statement::Statement;
40use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
41use table::table::adapter::DfTableProviderAdapter;
42use tokio::sync::oneshot;
43use tokio::sync::oneshot::error::TryRecvError;
44use tokio::time::Instant;
45
46use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
47use crate::batching_mode::BatchingModeOptions;
48use crate::batching_mode::frontend_client::FrontendClient;
49use crate::batching_mode::state::{FilterExprInfo, TaskState};
50use crate::batching_mode::time_window::TimeWindowExpr;
51use crate::batching_mode::utils::{
52    AddFilterRewriter, ColumnMatcherRewriter, FindGroupByFinalName, gen_plan_with_matching_schema,
53    get_table_info_df_schema, sql_to_df_plan,
54};
55use crate::df_optimizer::apply_df_optimizer;
56use crate::error::{
57    ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
58    SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
59};
60use crate::metrics::{
61    METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
62    METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
63    METRIC_FLOW_ROWS,
64};
65use crate::{Error, FlowId};
66
67/// The task's config, immutable once created
68#[derive(Clone)]
69pub struct TaskConfig {
70    pub flow_id: FlowId,
71    pub query: String,
72    /// output schema of the query
73    pub output_schema: DFSchemaRef,
74    pub time_window_expr: Option<TimeWindowExpr>,
75    /// in seconds
76    pub expire_after: Option<i64>,
77    pub sink_table_name: [String; 3],
78    pub source_table_names: HashSet<[String; 3]>,
79    pub catalog_manager: CatalogManagerRef,
80    pub query_type: QueryType,
81    pub batch_opts: Arc<BatchingModeOptions>,
82    pub flow_eval_interval: Option<Duration>,
83}
84
85fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
86    let stmts =
87        ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
88            .map_err(BoxedError::new)
89            .context(ExternalSnafu)?;
90
91    ensure!(
92        stmts.len() == 1,
93        InvalidQuerySnafu {
94            reason: format!("Expect only one statement, found {}", stmts.len())
95        }
96    );
97    let stmt = &stmts[0];
98    match stmt {
99        Statement::Tql(_) => Ok(QueryType::Tql),
100        _ => Ok(QueryType::Sql),
101    }
102}
103
104#[derive(Debug, Clone, PartialEq, Eq)]
105pub enum QueryType {
106    /// query is a tql query
107    Tql,
108    /// query is a sql query
109    Sql,
110}
111
112#[derive(Clone)]
113pub struct BatchingTask {
114    pub config: Arc<TaskConfig>,
115    pub state: Arc<RwLock<TaskState>>,
116}
117
118/// Arguments for creating batching task
119pub struct TaskArgs<'a> {
120    pub flow_id: FlowId,
121    pub query: &'a str,
122    pub plan: LogicalPlan,
123    pub time_window_expr: Option<TimeWindowExpr>,
124    pub expire_after: Option<i64>,
125    pub sink_table_name: [String; 3],
126    pub source_table_names: Vec<[String; 3]>,
127    pub query_ctx: QueryContextRef,
128    pub catalog_manager: CatalogManagerRef,
129    pub shutdown_rx: oneshot::Receiver<()>,
130    pub batch_opts: Arc<BatchingModeOptions>,
131    pub flow_eval_interval: Option<Duration>,
132}
133
134pub struct PlanInfo {
135    pub plan: LogicalPlan,
136    pub filter: Option<FilterExprInfo>,
137}
138
139impl BatchingTask {
140    #[allow(clippy::too_many_arguments)]
141    pub fn try_new(
142        TaskArgs {
143            flow_id,
144            query,
145            plan,
146            time_window_expr,
147            expire_after,
148            sink_table_name,
149            source_table_names,
150            query_ctx,
151            catalog_manager,
152            shutdown_rx,
153            batch_opts,
154            flow_eval_interval,
155        }: TaskArgs<'_>,
156    ) -> Result<Self, Error> {
157        Ok(Self {
158            config: Arc::new(TaskConfig {
159                flow_id,
160                query: query.to_string(),
161                time_window_expr,
162                expire_after,
163                sink_table_name,
164                source_table_names: source_table_names.into_iter().collect(),
165                catalog_manager,
166                output_schema: plan.schema().clone(),
167                query_type: determine_query_type(query, &query_ctx)?,
168                batch_opts,
169                flow_eval_interval,
170            }),
171            state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
172        })
173    }
174
175    /// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
176    ///
177    /// useful for flush_flow to flush dirty time windows range
178    pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
179        let now = SystemTime::now();
180        let now = Timestamp::new_second(
181            now.duration_since(UNIX_EPOCH)
182                .expect("Time went backwards")
183                .as_secs() as _,
184        );
185        let lower_bound = self
186            .config
187            .expire_after
188            .map(|e| now.sub_duration(Duration::from_secs(e as _)))
189            .transpose()
190            .map_err(BoxedError::new)
191            .context(ExternalSnafu)?
192            .unwrap_or(Timestamp::new_second(0));
193        debug!(
194            "Flow {} mark range ({:?}, {:?}) as dirty",
195            self.config.flow_id, lower_bound, now
196        );
197        self.state
198            .write()
199            .unwrap()
200            .dirty_time_windows
201            .add_window(lower_bound, Some(now));
202        Ok(())
203    }
204
205    /// Create sink table if not exists
206    pub async fn check_or_create_sink_table(
207        &self,
208        engine: &QueryEngineRef,
209        frontend_client: &Arc<FrontendClient>,
210    ) -> Result<Option<(u32, Duration)>, Error> {
211        if !self.is_table_exist(&self.config.sink_table_name).await? {
212            let create_table = self.gen_create_table_expr(engine.clone()).await?;
213            info!(
214                "Try creating sink table(if not exists) with expr: {:?}",
215                create_table
216            );
217            self.create_table(frontend_client, create_table).await?;
218            info!(
219                "Sink table {}(if not exists) created",
220                self.config.sink_table_name.join(".")
221            );
222        }
223
224        Ok(None)
225    }
226
227    async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
228        self.config
229            .catalog_manager
230            .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
231            .await
232            .map_err(BoxedError::new)
233            .context(ExternalSnafu)
234    }
235
236    pub async fn gen_exec_once(
237        &self,
238        engine: &QueryEngineRef,
239        frontend_client: &Arc<FrontendClient>,
240        max_window_cnt: Option<usize>,
241    ) -> Result<Option<(u32, Duration)>, Error> {
242        if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? {
243            debug!("Generate new query: {}", new_query.plan);
244            self.execute_logical_plan(frontend_client, &new_query.plan)
245                .await
246        } else {
247            debug!("Generate no query");
248            Ok(None)
249        }
250    }
251
252    pub async fn gen_insert_plan(
253        &self,
254        engine: &QueryEngineRef,
255        max_window_cnt: Option<usize>,
256    ) -> Result<Option<PlanInfo>, Error> {
257        let (table, df_schema) = get_table_info_df_schema(
258            self.config.catalog_manager.clone(),
259            self.config.sink_table_name.clone(),
260        )
261        .await?;
262
263        let new_query = self
264            .gen_query_with_time_window(
265                engine.clone(),
266                &table.table_info().meta.schema,
267                max_window_cnt,
268            )
269            .await?;
270
271        let insert_into_info = if let Some(new_query) = new_query {
272            // first check if all columns in input query exists in sink table
273            // since insert into ref to names in record batch generate by given query
274            let table_columns = df_schema
275                .columns()
276                .into_iter()
277                .map(|c| c.name)
278                .collect::<BTreeSet<_>>();
279            for column in new_query.plan.schema().columns() {
280                ensure!(
281                    table_columns.contains(column.name()),
282                    InvalidQuerySnafu {
283                        reason: format!(
284                            "Column {} not found in sink table with columns {:?}",
285                            column, table_columns
286                        ),
287                    }
288                );
289            }
290
291            let table_provider = Arc::new(DfTableProviderAdapter::new(table));
292            let table_source = Arc::new(DefaultTableSource::new(table_provider));
293
294            // update_at& time index placeholder (if exists) should have default value
295            let plan = LogicalPlan::Dml(DmlStatement::new(
296                datafusion_common::TableReference::Full {
297                    catalog: self.config.sink_table_name[0].clone().into(),
298                    schema: self.config.sink_table_name[1].clone().into(),
299                    table: self.config.sink_table_name[2].clone().into(),
300                },
301                table_source,
302                WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
303                Arc::new(new_query.plan),
304            ));
305            PlanInfo {
306                plan,
307                filter: new_query.filter,
308            }
309        } else {
310            return Ok(None);
311        };
312        let insert_into = insert_into_info
313            .plan
314            .recompute_schema()
315            .context(DatafusionSnafu {
316                context: "Failed to recompute schema",
317            })?;
318
319        Ok(Some(PlanInfo {
320            plan: insert_into,
321            filter: insert_into_info.filter,
322        }))
323    }
324
325    pub async fn create_table(
326        &self,
327        frontend_client: &Arc<FrontendClient>,
328        expr: CreateTableExpr,
329    ) -> Result<(), Error> {
330        let catalog = &self.config.sink_table_name[0];
331        let schema = &self.config.sink_table_name[1];
332        frontend_client
333            .create(expr.clone(), catalog, schema)
334            .await?;
335        Ok(())
336    }
337
338    pub async fn execute_logical_plan(
339        &self,
340        frontend_client: &Arc<FrontendClient>,
341        plan: &LogicalPlan,
342    ) -> Result<Option<(u32, Duration)>, Error> {
343        let instant = Instant::now();
344        let flow_id = self.config.flow_id;
345
346        debug!(
347            "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
348            self.config.expire_after, &plan
349        );
350
351        let catalog = &self.config.sink_table_name[0];
352        let schema = &self.config.sink_table_name[1];
353
354        // fix all table ref by make it fully qualified, i.e. "table_name" => "catalog_name.schema_name.table_name"
355        let plan = plan
356            .clone()
357            .transform_down_with_subqueries(|p| {
358                if let LogicalPlan::TableScan(mut table_scan) = p {
359                    let resolved = table_scan.table_name.resolve(catalog, schema);
360                    table_scan.table_name = resolved.into();
361                    Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
362                } else {
363                    Ok(Transformed::no(p))
364                }
365            })
366            .with_context(|_| DatafusionSnafu {
367                context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
368            })?
369            .data;
370
371        let mut peer_desc = None;
372
373        let res = {
374            let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
375                .with_label_values(&[flow_id.to_string().as_str()])
376                .start_timer();
377
378            // hack and special handling the insert logical plan
379            let req = if let Some((insert_to, insert_plan)) =
380                breakup_insert_plan(&plan, catalog, schema)
381            {
382                let message = DFLogicalSubstraitConvertor {}
383                    .encode(&insert_plan, DefaultSerializer)
384                    .context(SubstraitEncodeLogicalPlanSnafu)?;
385                api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
386                    query: Some(api::v1::query_request::Query::InsertIntoPlan(
387                        api::v1::InsertIntoPlan {
388                            table_name: Some(insert_to),
389                            logical_plan: message.to_vec(),
390                        },
391                    )),
392                })
393            } else {
394                let message = DFLogicalSubstraitConvertor {}
395                    .encode(&plan, DefaultSerializer)
396                    .context(SubstraitEncodeLogicalPlanSnafu)?;
397
398                api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
399                    query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
400                })
401            };
402
403            frontend_client
404                .handle(req, catalog, schema, &mut peer_desc)
405                .await
406        };
407
408        let elapsed = instant.elapsed();
409        if let Ok(affected_rows) = &res {
410            debug!(
411                "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
412                elapsed
413            );
414            METRIC_FLOW_ROWS
415                .with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
416                .inc_by(*affected_rows as _);
417        } else if let Err(err) = &res {
418            warn!(
419                "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
420                peer_desc, elapsed, &plan
421            );
422        }
423
424        // record slow query
425        if elapsed >= self.config.batch_opts.slow_query_threshold {
426            warn!(
427                "Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}",
428                peer_desc, elapsed, &plan
429            );
430            METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
431                .with_label_values(&[
432                    flow_id.to_string().as_str(),
433                    &peer_desc.unwrap_or_default().to_string(),
434                ])
435                .observe(elapsed.as_secs_f64());
436        }
437
438        self.state
439            .write()
440            .unwrap()
441            .after_query_exec(elapsed, res.is_ok());
442
443        let res = res?;
444
445        Ok(Some((res, elapsed)))
446    }
447
448    /// start executing query in a loop, break when receive shutdown signal
449    ///
450    /// any error will be logged when executing query
451    pub async fn start_executing_loop(
452        &self,
453        engine: QueryEngineRef,
454        frontend_client: Arc<FrontendClient>,
455    ) {
456        let flow_id_str = self.config.flow_id.to_string();
457        let mut max_window_cnt = None;
458        let mut interval = self
459            .config
460            .flow_eval_interval
461            .map(|d| tokio::time::interval(d));
462        if let Some(tick) = &mut interval {
463            tick.tick().await; // pass the first tick immediately
464        }
465        loop {
466            // first check if shutdown signal is received
467            // if so, break the loop
468            {
469                let mut state = self.state.write().unwrap();
470                match state.shutdown_rx.try_recv() {
471                    Ok(()) => break,
472                    Err(TryRecvError::Closed) => {
473                        warn!(
474                            "Unexpected shutdown flow {}, shutdown anyway",
475                            self.config.flow_id
476                        );
477                        break;
478                    }
479                    Err(TryRecvError::Empty) => (),
480                }
481            }
482            METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
483                .with_label_values(&[&flow_id_str])
484                .inc();
485
486            let min_refresh = self.config.batch_opts.experimental_min_refresh_duration;
487
488            let new_query = match self.gen_insert_plan(&engine, max_window_cnt).await {
489                Ok(new_query) => new_query,
490                Err(err) => {
491                    common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
492                    // also sleep for a little while before try again to prevent flooding logs
493                    tokio::time::sleep(min_refresh).await;
494                    continue;
495                }
496            };
497
498            let res = if let Some(new_query) = &new_query {
499                self.execute_logical_plan(&frontend_client, &new_query.plan)
500                    .await
501            } else {
502                Ok(None)
503            };
504
505            match res {
506                // normal execute, sleep for some time before doing next query
507                Ok(Some(_)) => {
508                    // can increase max_window_cnt to query more windows next time
509                    max_window_cnt = max_window_cnt.map(|cnt| {
510                        (cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query)
511                    });
512
513                    // here use proper ticking if set eval interval
514                    if let Some(eval_interval) = &mut interval {
515                        eval_interval.tick().await;
516                    } else {
517                        // if not explicitly set, just automatically calculate next start time
518                        // using time window size and more args
519                        let sleep_until = {
520                            let state = self.state.write().unwrap();
521
522                            let time_window_size = self
523                                .config
524                                .time_window_expr
525                                .as_ref()
526                                .and_then(|t| *t.time_window_size());
527
528                            state.get_next_start_query_time(
529                                self.config.flow_id,
530                                &time_window_size,
531                                min_refresh,
532                                Some(self.config.batch_opts.query_timeout),
533                                self.config.batch_opts.experimental_max_filter_num_per_query,
534                            )
535                        };
536
537                        tokio::time::sleep_until(sleep_until).await;
538                    };
539                }
540                // no new data, sleep for some time before checking for new data
541                Ok(None) => {
542                    debug!(
543                        "Flow id = {:?} found no new data, sleep for {:?} then continue",
544                        self.config.flow_id, min_refresh
545                    );
546                    tokio::time::sleep(min_refresh).await;
547                    continue;
548                }
549                // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
550                Err(err) => {
551                    METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
552                        .with_label_values(&[&flow_id_str])
553                        .inc();
554                    match new_query {
555                        Some(query) => {
556                            common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
557                            // Re-add dirty windows back since query failed
558                            self.state.write().unwrap().dirty_time_windows.add_windows(
559                                query.filter.map(|f| f.time_ranges).unwrap_or_default(),
560                            );
561                            // TODO(discord9): add some backoff here? half the query time window or what
562                            // backoff meaning use smaller `max_window_cnt` for next query
563
564                            // since last query failed, we should not try to query too many windows
565                            max_window_cnt = Some(1);
566                        }
567                        None => {
568                            common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
569                        }
570                    }
571                    // also sleep for a little while before try again to prevent flooding logs
572                    tokio::time::sleep(min_refresh).await;
573                }
574            }
575        }
576    }
577
578    /// Generate the create table SQL
579    ///
580    /// the auto created table will automatically added a `update_at` Milliseconds DEFAULT now() column in the end
581    /// (for compatibility with flow streaming mode)
582    ///
583    /// and it will use first timestamp column as time index, all other columns will be added as normal columns and nullable
584    async fn gen_create_table_expr(
585        &self,
586        engine: QueryEngineRef,
587    ) -> Result<CreateTableExpr, Error> {
588        let query_ctx = self.state.read().unwrap().query_ctx.clone();
589        let plan =
590            sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
591        create_table_with_expr(&plan, &self.config.sink_table_name, &self.config.query_type)
592    }
593
594    /// will merge and use the first ten time window in query
595    async fn gen_query_with_time_window(
596        &self,
597        engine: QueryEngineRef,
598        sink_table_schema: &Arc<Schema>,
599        max_window_cnt: Option<usize>,
600    ) -> Result<Option<PlanInfo>, Error> {
601        let query_ctx = self.state.read().unwrap().query_ctx.clone();
602        let start = SystemTime::now();
603        let since_the_epoch = start
604            .duration_since(UNIX_EPOCH)
605            .expect("Time went backwards");
606        let low_bound = self
607            .config
608            .expire_after
609            .map(|e| since_the_epoch.as_secs() - e as u64)
610            .unwrap_or(u64::MIN);
611
612        let low_bound = Timestamp::new_second(low_bound as i64);
613
614        let expire_time_window_bound = self
615            .config
616            .time_window_expr
617            .as_ref()
618            .map(|expr| expr.eval(low_bound))
619            .transpose()?;
620
621        let (expire_lower_bound, expire_upper_bound) =
622            match (expire_time_window_bound, &self.config.query_type) {
623                (Some((Some(l), Some(u))), QueryType::Sql) => (l, u),
624                (None, QueryType::Sql) => {
625                    // if it's sql query and no time window lower/upper bound is found, just return the original query(with auto columns)
626                    // use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
627                    debug!(
628                        "Flow id = {:?}, no time window, using the same query",
629                        self.config.flow_id
630                    );
631                    // clean dirty time window too, this could be from create flow's check_execute
632                    let is_dirty = !self.state.read().unwrap().dirty_time_windows.is_empty();
633                    self.state.write().unwrap().dirty_time_windows.clean();
634
635                    if !is_dirty {
636                        // no dirty data, hence no need to update
637                        debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
638                        return Ok(None);
639                    }
640
641                    let plan = gen_plan_with_matching_schema(
642                        &self.config.query,
643                        query_ctx,
644                        engine,
645                        sink_table_schema.clone(),
646                    )
647                    .await?;
648
649                    return Ok(Some(PlanInfo { plan, filter: None }));
650                }
651                _ => {
652                    // clean for tql have no use for time window
653                    self.state.write().unwrap().dirty_time_windows.clean();
654
655                    let plan = gen_plan_with_matching_schema(
656                        &self.config.query,
657                        query_ctx,
658                        engine,
659                        sink_table_schema.clone(),
660                    )
661                    .await?;
662
663                    return Ok(Some(PlanInfo { plan, filter: None }));
664                }
665            };
666
667        debug!(
668            "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
669            self.config.flow_id,
670            expire_lower_bound,
671            expire_upper_bound,
672            self.state.read().unwrap().dirty_time_windows
673        );
674        let window_size = expire_upper_bound
675            .sub(&expire_lower_bound)
676            .with_context(|| UnexpectedSnafu {
677                reason: format!(
678                    "Can't get window size from {expire_upper_bound:?} - {expire_lower_bound:?}"
679                ),
680            })?;
681        let col_name = self
682            .config
683            .time_window_expr
684            .as_ref()
685            .map(|expr| expr.column_name.clone())
686            .with_context(|| UnexpectedSnafu {
687                reason: format!(
688                    "Flow id={:?}, Failed to get column name from time window expr",
689                    self.config.flow_id
690                ),
691            })?;
692
693        let expr = self
694            .state
695            .write()
696            .unwrap()
697            .dirty_time_windows
698            .gen_filter_exprs(
699                &col_name,
700                Some(expire_lower_bound),
701                window_size,
702                max_window_cnt
703                    .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query),
704                self.config.flow_id,
705                Some(self),
706            )?;
707
708        debug!(
709            "Flow id={:?}, Generated filter expr: {:?}",
710            self.config.flow_id,
711            expr.as_ref()
712                .map(
713                    |expr| expr_to_sql(&expr.expr).with_context(|_| DatafusionSnafu {
714                        context: format!("Failed to generate filter expr from {expr:?}"),
715                    })
716                )
717                .transpose()?
718                .map(|s| s.to_string())
719        );
720
721        let Some(expr) = expr else {
722            // no new data, hence no need to update
723            debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
724            return Ok(None);
725        };
726
727        let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
728        let mut add_auto_column = ColumnMatcherRewriter::new(sink_table_schema.clone());
729
730        let plan =
731            sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;
732        let rewrite = plan
733            .clone()
734            .rewrite(&mut add_filter)
735            .and_then(|p| p.data.rewrite(&mut add_auto_column))
736            .with_context(|_| DatafusionSnafu {
737                context: format!("Failed to rewrite plan:\n {}\n", plan),
738            })?
739            .data;
740        // only apply optimize after complex rewrite is done
741        let new_plan = apply_df_optimizer(rewrite, &query_ctx).await?;
742
743        let info = PlanInfo {
744            plan: new_plan.clone(),
745            filter: Some(expr),
746        };
747
748        Ok(Some(info))
749    }
750}
751
752// auto created table have a auto added column `update_at`, and optional have a `AUTO_CREATED_PLACEHOLDER_TS_COL` column for time index placeholder if no timestamp column is specified
753// TODO(discord9): for now no default value is set for auto added column for compatibility reason with streaming mode, but this might change in favor of simpler code?
754fn create_table_with_expr(
755    plan: &LogicalPlan,
756    sink_table_name: &[String; 3],
757    query_type: &QueryType,
758) -> Result<CreateTableExpr, Error> {
759    let table_def = match query_type {
760        &QueryType::Sql => {
761            if let Some(def) = build_pk_from_aggr(plan)? {
762                def
763            } else {
764                build_by_sql_schema(plan)?
765            }
766        }
767        QueryType::Tql => {
768            // first try build from aggr, then from tql schema because tql query might not have aggr node
769            if let Some(table_def) = build_pk_from_aggr(plan)? {
770                table_def
771            } else {
772                build_by_tql_schema(plan)?
773            }
774        }
775    };
776    let first_time_stamp = table_def.ts_col;
777    let primary_keys = table_def.pks;
778
779    let mut column_schemas = Vec::new();
780    for field in plan.schema().fields() {
781        let name = field.name();
782        let ty = ConcreteDataType::from_arrow_type(field.data_type());
783        let col_schema = if first_time_stamp == Some(name.clone()) {
784            ColumnSchema::new(name, ty, false).with_time_index(true)
785        } else {
786            ColumnSchema::new(name, ty, true)
787        };
788
789        match query_type {
790            QueryType::Sql => {
791                column_schemas.push(col_schema);
792            }
793            QueryType::Tql => {
794                // if is val column, need to rename as val DOUBLE NULL
795                // if is tag column, need to cast type as STRING NULL
796                let is_tag_column = primary_keys.contains(name);
797                let is_val_column = !is_tag_column && first_time_stamp.as_ref() != Some(name);
798                if is_val_column {
799                    let col_schema =
800                        ColumnSchema::new(name, ConcreteDataType::float64_datatype(), true);
801                    column_schemas.push(col_schema);
802                } else if is_tag_column {
803                    let col_schema =
804                        ColumnSchema::new(name, ConcreteDataType::string_datatype(), true);
805                    column_schemas.push(col_schema);
806                } else {
807                    // time index column
808                    column_schemas.push(col_schema);
809                }
810            }
811        }
812    }
813
814    if query_type == &QueryType::Sql {
815        let update_at_schema = ColumnSchema::new(
816            AUTO_CREATED_UPDATE_AT_TS_COL,
817            ConcreteDataType::timestamp_millisecond_datatype(),
818            true,
819        );
820        column_schemas.push(update_at_schema);
821    }
822
823    let time_index = if let Some(time_index) = first_time_stamp {
824        time_index
825    } else {
826        column_schemas.push(
827            ColumnSchema::new(
828                AUTO_CREATED_PLACEHOLDER_TS_COL,
829                ConcreteDataType::timestamp_millisecond_datatype(),
830                false,
831            )
832            .with_time_index(true),
833        );
834        AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
835    };
836
837    let column_defs =
838        column_schemas_to_defs(column_schemas, &primary_keys).context(ConvertColumnSchemaSnafu)?;
839    Ok(CreateTableExpr {
840        catalog_name: sink_table_name[0].clone(),
841        schema_name: sink_table_name[1].clone(),
842        table_name: sink_table_name[2].clone(),
843        desc: "Auto created table by flow engine".to_string(),
844        column_defs,
845        time_index,
846        primary_keys,
847        create_if_not_exists: true,
848        table_options: Default::default(),
849        table_id: None,
850        engine: "mito".to_string(),
851    })
852}
853
854/// simply build by schema, return first timestamp column and no primary key
855fn build_by_sql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
856    let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
857        if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
858            Some(f.name().clone())
859        } else {
860            None
861        }
862    });
863    Ok(TableDef {
864        ts_col: first_time_stamp,
865        pks: vec![],
866    })
867}
868
869/// Return first timestamp column found in output schema and all string columns
870fn build_by_tql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
871    let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
872        if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
873            Some(f.name().clone())
874        } else {
875            None
876        }
877    });
878    let string_columns = plan
879        .schema()
880        .fields()
881        .iter()
882        .filter_map(|f| {
883            if ConcreteDataType::from_arrow_type(f.data_type()).is_string() {
884                Some(f.name().clone())
885            } else {
886                None
887            }
888        })
889        .collect::<Vec<_>>();
890
891    Ok(TableDef {
892        ts_col: first_time_stamp,
893        pks: string_columns,
894    })
895}
896
897struct TableDef {
898    ts_col: Option<String>,
899    pks: Vec<String>,
900}
901
902/// Return first timestamp column which is in group by clause and other columns which are also in group by clause
903///
904/// # Returns
905///
906/// * `Option<String>` - first timestamp column which is in group by clause
907/// * `Vec<String>` - other columns which are also in group by clause
908///
909/// if no aggregation found, return None
910fn build_pk_from_aggr(plan: &LogicalPlan) -> Result<Option<TableDef>, Error> {
911    let fields = plan.schema().fields();
912    let mut pk_names = FindGroupByFinalName::default();
913
914    plan.visit(&mut pk_names)
915        .with_context(|_| DatafusionSnafu {
916            context: format!("Can't find aggr expr in plan {plan:?}"),
917        })?;
918
919    // if no group by clause, return empty with first timestamp column found in output schema
920    let Some(pk_final_names) = pk_names.get_group_expr_names() else {
921        return Ok(None);
922    };
923    if pk_final_names.is_empty() {
924        let first_ts_col = fields
925            .iter()
926            .find(|f| ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp())
927            .map(|f| f.name().clone());
928        return Ok(Some(TableDef {
929            ts_col: first_ts_col,
930            pks: vec![],
931        }));
932    }
933
934    let all_pk_cols: Vec<_> = fields
935        .iter()
936        .filter(|f| pk_final_names.contains(f.name()))
937        .map(|f| f.name().clone())
938        .collect();
939    // auto create table use first timestamp column in group by clause as time index
940    let first_time_stamp = fields
941        .iter()
942        .find(|f| {
943            all_pk_cols.contains(&f.name().clone())
944                && ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()
945        })
946        .map(|f| f.name().clone());
947
948    let all_pk_cols: Vec<_> = all_pk_cols
949        .into_iter()
950        .filter(|col| first_time_stamp != Some(col.to_string()))
951        .collect();
952
953    Ok(Some(TableDef {
954        ts_col: first_time_stamp,
955        pks: all_pk_cols,
956    }))
957}
958
959#[cfg(test)]
960mod test {
961    use api::v1::column_def::try_as_column_schema;
962    use pretty_assertions::assert_eq;
963    use session::context::QueryContext;
964
965    use super::*;
966    use crate::test_utils::create_test_query_engine;
967
968    #[tokio::test]
969    async fn test_gen_create_table_sql() {
970        let query_engine = create_test_query_engine();
971        let ctx = QueryContext::arc();
972        struct TestCase {
973            sql: String,
974            sink_table_name: String,
975            column_schemas: Vec<ColumnSchema>,
976            primary_keys: Vec<String>,
977            time_index: String,
978        }
979
980        let update_at_schema = ColumnSchema::new(
981            AUTO_CREATED_UPDATE_AT_TS_COL,
982            ConcreteDataType::timestamp_millisecond_datatype(),
983            true,
984        );
985
986        let ts_placeholder_schema = ColumnSchema::new(
987            AUTO_CREATED_PLACEHOLDER_TS_COL,
988            ConcreteDataType::timestamp_millisecond_datatype(),
989            false,
990        )
991        .with_time_index(true);
992
993        let testcases = vec![
994            TestCase {
995                sql: "SELECT number, ts FROM numbers_with_ts".to_string(),
996                sink_table_name: "new_table".to_string(),
997                column_schemas: vec![
998                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
999                    ColumnSchema::new(
1000                        "ts",
1001                        ConcreteDataType::timestamp_millisecond_datatype(),
1002                        false,
1003                    )
1004                    .with_time_index(true),
1005                    update_at_schema.clone(),
1006                ],
1007                primary_keys: vec![],
1008                time_index: "ts".to_string(),
1009            },
1010            TestCase {
1011                sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(),
1012                sink_table_name: "new_table".to_string(),
1013                column_schemas: vec![
1014                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1015                    ColumnSchema::new(
1016                        "max(numbers_with_ts.ts)",
1017                        ConcreteDataType::timestamp_millisecond_datatype(),
1018                        true,
1019                    ),
1020                    update_at_schema.clone(),
1021                    ts_placeholder_schema.clone(),
1022                ],
1023                primary_keys: vec!["number".to_string()],
1024                time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
1025            },
1026            TestCase {
1027                sql: "SELECT max(number), ts FROM numbers_with_ts GROUP BY ts".to_string(),
1028                sink_table_name: "new_table".to_string(),
1029                column_schemas: vec![
1030                    ColumnSchema::new(
1031                        "max(numbers_with_ts.number)",
1032                        ConcreteDataType::uint32_datatype(),
1033                        true,
1034                    ),
1035                    ColumnSchema::new(
1036                        "ts",
1037                        ConcreteDataType::timestamp_millisecond_datatype(),
1038                        false,
1039                    )
1040                    .with_time_index(true),
1041                    update_at_schema.clone(),
1042                ],
1043                primary_keys: vec![],
1044                time_index: "ts".to_string(),
1045            },
1046            TestCase {
1047                sql: "SELECT number, ts FROM numbers_with_ts GROUP BY ts, number".to_string(),
1048                sink_table_name: "new_table".to_string(),
1049                column_schemas: vec![
1050                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1051                    ColumnSchema::new(
1052                        "ts",
1053                        ConcreteDataType::timestamp_millisecond_datatype(),
1054                        false,
1055                    )
1056                    .with_time_index(true),
1057                    update_at_schema.clone(),
1058                ],
1059                primary_keys: vec!["number".to_string()],
1060                time_index: "ts".to_string(),
1061            },
1062        ];
1063
1064        for tc in testcases {
1065            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &tc.sql, true)
1066                .await
1067                .unwrap();
1068            let expr = create_table_with_expr(
1069                &plan,
1070                &[
1071                    "greptime".to_string(),
1072                    "public".to_string(),
1073                    tc.sink_table_name.clone(),
1074                ],
1075                &QueryType::Sql,
1076            )
1077            .unwrap();
1078            // TODO(discord9): assert expr
1079            let column_schemas = expr
1080                .column_defs
1081                .iter()
1082                .map(|c| try_as_column_schema(c).unwrap())
1083                .collect::<Vec<_>>();
1084            assert_eq!(tc.column_schemas, column_schemas, "{:?}", tc.sql);
1085            assert_eq!(tc.primary_keys, expr.primary_keys, "{:?}", tc.sql);
1086            assert_eq!(tc.time_index, expr.time_index, "{:?}", tc.sql);
1087        }
1088    }
1089}