flow/batching_mode/
task.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18
19use api::v1::CreateTableExpr;
20use catalog::CatalogManagerRef;
21use common_error::ext::BoxedError;
22use common_query::logical_plan::breakup_insert_plan;
23use common_telemetry::tracing::warn;
24use common_telemetry::{debug, info};
25use common_time::Timestamp;
26use datafusion::datasource::DefaultTableSource;
27use datafusion::sql::unparser::expr_to_sql;
28use datafusion_common::DFSchemaRef;
29use datafusion_common::tree_node::{Transformed, TreeNode};
30use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
31use datatypes::prelude::ConcreteDataType;
32use datatypes::schema::{ColumnSchema, Schema};
33use operator::expr_helper::column_schemas_to_defs;
34use query::QueryEngineRef;
35use query::query_engine::DefaultSerializer;
36use session::context::QueryContextRef;
37use snafu::{OptionExt, ResultExt, ensure};
38use sql::parsers::utils::is_tql;
39use store_api::mito_engine_options::MERGE_MODE_KEY;
40use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
41use table::table::adapter::DfTableProviderAdapter;
42use tokio::sync::oneshot;
43use tokio::sync::oneshot::error::TryRecvError;
44use tokio::time::Instant;
45
46use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
47use crate::batching_mode::BatchingModeOptions;
48use crate::batching_mode::frontend_client::FrontendClient;
49use crate::batching_mode::state::{FilterExprInfo, TaskState};
50use crate::batching_mode::time_window::TimeWindowExpr;
51use crate::batching_mode::utils::{
52    AddFilterRewriter, ColumnMatcherRewriter, FindGroupByFinalName, gen_plan_with_matching_schema,
53    get_table_info_df_schema, sql_to_df_plan,
54};
55use crate::df_optimizer::apply_df_optimizer;
56use crate::error::{
57    ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
58    SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
59};
60use crate::metrics::{
61    METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
62    METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
63    METRIC_FLOW_ROWS,
64};
65use crate::{Error, FlowId};
66
67/// The task's config, immutable once created
68#[derive(Clone)]
69pub struct TaskConfig {
70    pub flow_id: FlowId,
71    pub query: String,
72    /// output schema of the query
73    pub output_schema: DFSchemaRef,
74    pub time_window_expr: Option<TimeWindowExpr>,
75    /// in seconds
76    pub expire_after: Option<i64>,
77    pub sink_table_name: [String; 3],
78    pub source_table_names: HashSet<[String; 3]>,
79    pub catalog_manager: CatalogManagerRef,
80    pub query_type: QueryType,
81    pub batch_opts: Arc<BatchingModeOptions>,
82    pub flow_eval_interval: Option<Duration>,
83}
84
85fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
86    let is_tql = is_tql(query_ctx.sql_dialect(), query)
87        .map_err(BoxedError::new)
88        .context(ExternalSnafu)?;
89    Ok(if is_tql {
90        QueryType::Tql
91    } else {
92        QueryType::Sql
93    })
94}
95
96fn is_merge_mode_last_non_null(options: &HashMap<String, String>) -> bool {
97    options
98        .get(MERGE_MODE_KEY)
99        .map(|mode| mode.eq_ignore_ascii_case("last_non_null"))
100        .unwrap_or(false)
101}
102
103#[derive(Debug, Clone, PartialEq, Eq)]
104pub enum QueryType {
105    /// query is a tql query
106    Tql,
107    /// query is a sql query
108    Sql,
109}
110
111#[derive(Clone)]
112pub struct BatchingTask {
113    pub config: Arc<TaskConfig>,
114    pub state: Arc<RwLock<TaskState>>,
115}
116
117/// Arguments for creating batching task
118pub struct TaskArgs<'a> {
119    pub flow_id: FlowId,
120    pub query: &'a str,
121    pub plan: LogicalPlan,
122    pub time_window_expr: Option<TimeWindowExpr>,
123    pub expire_after: Option<i64>,
124    pub sink_table_name: [String; 3],
125    pub source_table_names: Vec<[String; 3]>,
126    pub query_ctx: QueryContextRef,
127    pub catalog_manager: CatalogManagerRef,
128    pub shutdown_rx: oneshot::Receiver<()>,
129    pub batch_opts: Arc<BatchingModeOptions>,
130    pub flow_eval_interval: Option<Duration>,
131}
132
133pub struct PlanInfo {
134    pub plan: LogicalPlan,
135    pub filter: Option<FilterExprInfo>,
136}
137
138impl BatchingTask {
139    #[allow(clippy::too_many_arguments)]
140    pub fn try_new(
141        TaskArgs {
142            flow_id,
143            query,
144            plan,
145            time_window_expr,
146            expire_after,
147            sink_table_name,
148            source_table_names,
149            query_ctx,
150            catalog_manager,
151            shutdown_rx,
152            batch_opts,
153            flow_eval_interval,
154        }: TaskArgs<'_>,
155    ) -> Result<Self, Error> {
156        Ok(Self {
157            config: Arc::new(TaskConfig {
158                flow_id,
159                query: query.to_string(),
160                time_window_expr,
161                expire_after,
162                sink_table_name,
163                source_table_names: source_table_names.into_iter().collect(),
164                catalog_manager,
165                output_schema: plan.schema().clone(),
166                query_type: determine_query_type(query, &query_ctx)?,
167                batch_opts,
168                flow_eval_interval,
169            }),
170            state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
171        })
172    }
173
174    pub fn last_execution_time_millis(&self) -> Option<i64> {
175        self.state.read().unwrap().last_execution_time_millis()
176    }
177
178    /// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
179    ///
180    /// useful for flush_flow to flush dirty time windows range
181    pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
182        let now = SystemTime::now();
183        let now = Timestamp::new_second(
184            now.duration_since(UNIX_EPOCH)
185                .expect("Time went backwards")
186                .as_secs() as _,
187        );
188        let lower_bound = self
189            .config
190            .expire_after
191            .map(|e| now.sub_duration(Duration::from_secs(e as _)))
192            .transpose()
193            .map_err(BoxedError::new)
194            .context(ExternalSnafu)?
195            .unwrap_or(Timestamp::new_second(0));
196        debug!(
197            "Flow {} mark range ({:?}, {:?}) as dirty",
198            self.config.flow_id, lower_bound, now
199        );
200        self.state
201            .write()
202            .unwrap()
203            .dirty_time_windows
204            .add_window(lower_bound, Some(now));
205        Ok(())
206    }
207
208    /// Create sink table if not exists
209    pub async fn check_or_create_sink_table(
210        &self,
211        engine: &QueryEngineRef,
212        frontend_client: &Arc<FrontendClient>,
213    ) -> Result<Option<(u32, Duration)>, Error> {
214        if !self.is_table_exist(&self.config.sink_table_name).await? {
215            let create_table = self.gen_create_table_expr(engine.clone()).await?;
216            info!(
217                "Try creating sink table(if not exists) with expr: {:?}",
218                create_table
219            );
220            self.create_table(frontend_client, create_table).await?;
221            info!(
222                "Sink table {}(if not exists) created",
223                self.config.sink_table_name.join(".")
224            );
225        }
226
227        Ok(None)
228    }
229
230    async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
231        self.config
232            .catalog_manager
233            .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
234            .await
235            .map_err(BoxedError::new)
236            .context(ExternalSnafu)
237    }
238
239    pub async fn gen_exec_once(
240        &self,
241        engine: &QueryEngineRef,
242        frontend_client: &Arc<FrontendClient>,
243        max_window_cnt: Option<usize>,
244    ) -> Result<Option<(u32, Duration)>, Error> {
245        if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? {
246            debug!("Generate new query: {}", new_query.plan);
247            self.execute_logical_plan(frontend_client, &new_query.plan)
248                .await
249        } else {
250            debug!("Generate no query");
251            Ok(None)
252        }
253    }
254
255    pub async fn gen_insert_plan(
256        &self,
257        engine: &QueryEngineRef,
258        max_window_cnt: Option<usize>,
259    ) -> Result<Option<PlanInfo>, Error> {
260        let (table, df_schema) = get_table_info_df_schema(
261            self.config.catalog_manager.clone(),
262            self.config.sink_table_name.clone(),
263        )
264        .await?;
265
266        let table_meta = &table.table_info().meta;
267        let merge_mode_last_non_null =
268            is_merge_mode_last_non_null(&table_meta.options.extra_options);
269        let primary_key_indices = table_meta.primary_key_indices.clone();
270
271        let new_query = self
272            .gen_query_with_time_window(
273                engine.clone(),
274                &table.table_info().meta.schema,
275                &primary_key_indices,
276                merge_mode_last_non_null,
277                max_window_cnt,
278            )
279            .await?;
280
281        let insert_into_info = if let Some(new_query) = new_query {
282            // first check if all columns in input query exists in sink table
283            // since insert into ref to names in record batch generate by given query
284            let table_columns = df_schema
285                .columns()
286                .into_iter()
287                .map(|c| c.name)
288                .collect::<BTreeSet<_>>();
289            for column in new_query.plan.schema().columns() {
290                ensure!(
291                    table_columns.contains(column.name()),
292                    InvalidQuerySnafu {
293                        reason: format!(
294                            "Column {} not found in sink table with columns {:?}",
295                            column, table_columns
296                        ),
297                    }
298                );
299            }
300
301            let table_provider = Arc::new(DfTableProviderAdapter::new(table));
302            let table_source = Arc::new(DefaultTableSource::new(table_provider));
303
304            // update_at& time index placeholder (if exists) should have default value
305            let plan = LogicalPlan::Dml(DmlStatement::new(
306                datafusion_common::TableReference::Full {
307                    catalog: self.config.sink_table_name[0].clone().into(),
308                    schema: self.config.sink_table_name[1].clone().into(),
309                    table: self.config.sink_table_name[2].clone().into(),
310                },
311                table_source,
312                WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
313                Arc::new(new_query.plan),
314            ));
315            PlanInfo {
316                plan,
317                filter: new_query.filter,
318            }
319        } else {
320            return Ok(None);
321        };
322        let insert_into = insert_into_info
323            .plan
324            .recompute_schema()
325            .context(DatafusionSnafu {
326                context: "Failed to recompute schema",
327            })?;
328
329        Ok(Some(PlanInfo {
330            plan: insert_into,
331            filter: insert_into_info.filter,
332        }))
333    }
334
335    pub async fn create_table(
336        &self,
337        frontend_client: &Arc<FrontendClient>,
338        expr: CreateTableExpr,
339    ) -> Result<(), Error> {
340        let catalog = &self.config.sink_table_name[0];
341        let schema = &self.config.sink_table_name[1];
342        frontend_client
343            .create(expr.clone(), catalog, schema)
344            .await?;
345        Ok(())
346    }
347
348    pub async fn execute_logical_plan(
349        &self,
350        frontend_client: &Arc<FrontendClient>,
351        plan: &LogicalPlan,
352    ) -> Result<Option<(u32, Duration)>, Error> {
353        let instant = Instant::now();
354        let flow_id = self.config.flow_id;
355
356        debug!(
357            "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
358            self.config.expire_after, &plan
359        );
360
361        let catalog = &self.config.sink_table_name[0];
362        let schema = &self.config.sink_table_name[1];
363
364        // fix all table ref by make it fully qualified, i.e. "table_name" => "catalog_name.schema_name.table_name"
365        let plan = plan
366            .clone()
367            .transform_down_with_subqueries(|p| {
368                if let LogicalPlan::TableScan(mut table_scan) = p {
369                    let resolved = table_scan.table_name.resolve(catalog, schema);
370                    table_scan.table_name = resolved.into();
371                    Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
372                } else {
373                    Ok(Transformed::no(p))
374                }
375            })
376            .with_context(|_| DatafusionSnafu {
377                context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
378            })?
379            .data;
380
381        let mut peer_desc = None;
382
383        let res = {
384            let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
385                .with_label_values(&[flow_id.to_string().as_str()])
386                .start_timer();
387
388            // hack and special handling the insert logical plan
389            let req = if let Some((insert_to, insert_plan)) =
390                breakup_insert_plan(&plan, catalog, schema)
391            {
392                let message = DFLogicalSubstraitConvertor {}
393                    .encode(&insert_plan, DefaultSerializer)
394                    .context(SubstraitEncodeLogicalPlanSnafu)?;
395                api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
396                    query: Some(api::v1::query_request::Query::InsertIntoPlan(
397                        api::v1::InsertIntoPlan {
398                            table_name: Some(insert_to),
399                            logical_plan: message.to_vec(),
400                        },
401                    )),
402                })
403            } else {
404                let message = DFLogicalSubstraitConvertor {}
405                    .encode(&plan, DefaultSerializer)
406                    .context(SubstraitEncodeLogicalPlanSnafu)?;
407
408                api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
409                    query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
410                })
411            };
412
413            frontend_client
414                .handle(req, catalog, schema, &mut peer_desc)
415                .await
416        };
417
418        let elapsed = instant.elapsed();
419        if let Ok(affected_rows) = &res {
420            debug!(
421                "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
422                elapsed
423            );
424            METRIC_FLOW_ROWS
425                .with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
426                .inc_by(*affected_rows as _);
427        } else if let Err(err) = &res {
428            warn!(
429                "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
430                peer_desc, elapsed, &plan
431            );
432        }
433
434        // record slow query
435        if elapsed >= self.config.batch_opts.slow_query_threshold {
436            warn!(
437                "Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}",
438                peer_desc, elapsed, &plan
439            );
440            METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
441                .with_label_values(&[
442                    flow_id.to_string().as_str(),
443                    &peer_desc.unwrap_or_default().to_string(),
444                ])
445                .observe(elapsed.as_secs_f64());
446        }
447
448        self.state
449            .write()
450            .unwrap()
451            .after_query_exec(elapsed, res.is_ok());
452
453        let res = res?;
454
455        Ok(Some((res, elapsed)))
456    }
457
458    /// start executing query in a loop, break when receive shutdown signal
459    ///
460    /// any error will be logged when executing query
461    pub async fn start_executing_loop(
462        &self,
463        engine: QueryEngineRef,
464        frontend_client: Arc<FrontendClient>,
465    ) {
466        let flow_id_str = self.config.flow_id.to_string();
467        let mut max_window_cnt = None;
468        let mut interval = self
469            .config
470            .flow_eval_interval
471            .map(|d| tokio::time::interval(d));
472        if let Some(tick) = &mut interval {
473            tick.tick().await; // pass the first tick immediately
474        }
475        loop {
476            // first check if shutdown signal is received
477            // if so, break the loop
478            {
479                let mut state = self.state.write().unwrap();
480                match state.shutdown_rx.try_recv() {
481                    Ok(()) => break,
482                    Err(TryRecvError::Closed) => {
483                        warn!(
484                            "Unexpected shutdown flow {}, shutdown anyway",
485                            self.config.flow_id
486                        );
487                        break;
488                    }
489                    Err(TryRecvError::Empty) => (),
490                }
491            }
492            METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
493                .with_label_values(&[&flow_id_str])
494                .inc();
495
496            let min_refresh = self.config.batch_opts.experimental_min_refresh_duration;
497
498            let new_query = match self.gen_insert_plan(&engine, max_window_cnt).await {
499                Ok(new_query) => new_query,
500                Err(err) => {
501                    common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
502                    // also sleep for a little while before try again to prevent flooding logs
503                    tokio::time::sleep(min_refresh).await;
504                    continue;
505                }
506            };
507
508            let res = if let Some(new_query) = &new_query {
509                self.execute_logical_plan(&frontend_client, &new_query.plan)
510                    .await
511            } else {
512                Ok(None)
513            };
514
515            match res {
516                // normal execute, sleep for some time before doing next query
517                Ok(Some(_)) => {
518                    // can increase max_window_cnt to query more windows next time
519                    max_window_cnt = max_window_cnt.map(|cnt| {
520                        (cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query)
521                    });
522
523                    // here use proper ticking if set eval interval
524                    if let Some(eval_interval) = &mut interval {
525                        eval_interval.tick().await;
526                    } else {
527                        // if not explicitly set, just automatically calculate next start time
528                        // using time window size and more args
529                        let sleep_until = {
530                            let state = self.state.write().unwrap();
531
532                            let time_window_size = self
533                                .config
534                                .time_window_expr
535                                .as_ref()
536                                .and_then(|t| *t.time_window_size());
537
538                            state.get_next_start_query_time(
539                                self.config.flow_id,
540                                &time_window_size,
541                                min_refresh,
542                                Some(self.config.batch_opts.query_timeout),
543                                self.config.batch_opts.experimental_max_filter_num_per_query,
544                            )
545                        };
546
547                        tokio::time::sleep_until(sleep_until).await;
548                    };
549                }
550                // no new data, sleep for some time before checking for new data
551                Ok(None) => {
552                    debug!(
553                        "Flow id = {:?} found no new data, sleep for {:?} then continue",
554                        self.config.flow_id, min_refresh
555                    );
556                    tokio::time::sleep(min_refresh).await;
557                    continue;
558                }
559                // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
560                Err(err) => {
561                    METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
562                        .with_label_values(&[&flow_id_str])
563                        .inc();
564                    match new_query {
565                        Some(query) => {
566                            common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
567                            // Re-add dirty windows back since query failed
568                            self.state.write().unwrap().dirty_time_windows.add_windows(
569                                query.filter.map(|f| f.time_ranges).unwrap_or_default(),
570                            );
571                            // TODO(discord9): add some backoff here? half the query time window or what
572                            // backoff meaning use smaller `max_window_cnt` for next query
573
574                            // since last query failed, we should not try to query too many windows
575                            max_window_cnt = Some(1);
576                        }
577                        None => {
578                            common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
579                        }
580                    }
581                    // also sleep for a little while before try again to prevent flooding logs
582                    tokio::time::sleep(min_refresh).await;
583                }
584            }
585        }
586    }
587
588    /// Generate the create table SQL
589    ///
590    /// the auto created table will automatically added a `update_at` Milliseconds DEFAULT now() column in the end
591    /// (for compatibility with flow streaming mode)
592    ///
593    /// and it will use first timestamp column as time index, all other columns will be added as normal columns and nullable
594    async fn gen_create_table_expr(
595        &self,
596        engine: QueryEngineRef,
597    ) -> Result<CreateTableExpr, Error> {
598        let query_ctx = self.state.read().unwrap().query_ctx.clone();
599        let plan =
600            sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
601        create_table_with_expr(&plan, &self.config.sink_table_name, &self.config.query_type)
602    }
603
604    /// will merge and use the first ten time window in query
605    async fn gen_query_with_time_window(
606        &self,
607        engine: QueryEngineRef,
608        sink_table_schema: &Arc<Schema>,
609        primary_key_indices: &[usize],
610        allow_partial: bool,
611        max_window_cnt: Option<usize>,
612    ) -> Result<Option<PlanInfo>, Error> {
613        let query_ctx = self.state.read().unwrap().query_ctx.clone();
614        let start = SystemTime::now();
615        let since_the_epoch = start
616            .duration_since(UNIX_EPOCH)
617            .expect("Time went backwards");
618        let low_bound = self
619            .config
620            .expire_after
621            .map(|e| since_the_epoch.as_secs() - e as u64)
622            .unwrap_or(u64::MIN);
623
624        let low_bound = Timestamp::new_second(low_bound as i64);
625
626        let expire_time_window_bound = self
627            .config
628            .time_window_expr
629            .as_ref()
630            .map(|expr| expr.eval(low_bound))
631            .transpose()?;
632
633        let (expire_lower_bound, expire_upper_bound) =
634            match (expire_time_window_bound, &self.config.query_type) {
635                (Some((Some(l), Some(u))), QueryType::Sql) => (l, u),
636                (None, QueryType::Sql) => {
637                    // if it's sql query and no time window lower/upper bound is found, just return the original query(with auto columns)
638                    // use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
639                    debug!(
640                        "Flow id = {:?}, no time window, using the same query",
641                        self.config.flow_id
642                    );
643                    // clean dirty time window too, this could be from create flow's check_execute
644                    let is_dirty = !self.state.read().unwrap().dirty_time_windows.is_empty();
645                    self.state.write().unwrap().dirty_time_windows.clean();
646
647                    if !is_dirty {
648                        // no dirty data, hence no need to update
649                        debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
650                        return Ok(None);
651                    }
652
653                    let plan = gen_plan_with_matching_schema(
654                        &self.config.query,
655                        query_ctx,
656                        engine,
657                        sink_table_schema.clone(),
658                        primary_key_indices,
659                        allow_partial,
660                    )
661                    .await?;
662
663                    return Ok(Some(PlanInfo { plan, filter: None }));
664                }
665                _ => {
666                    // clean for tql have no use for time window
667                    self.state.write().unwrap().dirty_time_windows.clean();
668
669                    let plan = gen_plan_with_matching_schema(
670                        &self.config.query,
671                        query_ctx,
672                        engine,
673                        sink_table_schema.clone(),
674                        primary_key_indices,
675                        allow_partial,
676                    )
677                    .await?;
678
679                    return Ok(Some(PlanInfo { plan, filter: None }));
680                }
681            };
682
683        debug!(
684            "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
685            self.config.flow_id,
686            expire_lower_bound,
687            expire_upper_bound,
688            self.state.read().unwrap().dirty_time_windows
689        );
690        let window_size = expire_upper_bound
691            .sub(&expire_lower_bound)
692            .with_context(|| UnexpectedSnafu {
693                reason: format!(
694                    "Can't get window size from {expire_upper_bound:?} - {expire_lower_bound:?}"
695                ),
696            })?;
697        let col_name = self
698            .config
699            .time_window_expr
700            .as_ref()
701            .map(|expr| expr.column_name.clone())
702            .with_context(|| UnexpectedSnafu {
703                reason: format!(
704                    "Flow id={:?}, Failed to get column name from time window expr",
705                    self.config.flow_id
706                ),
707            })?;
708
709        let expr = self
710            .state
711            .write()
712            .unwrap()
713            .dirty_time_windows
714            .gen_filter_exprs(
715                &col_name,
716                Some(expire_lower_bound),
717                window_size,
718                max_window_cnt
719                    .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query),
720                self.config.flow_id,
721                Some(self),
722            )?;
723
724        debug!(
725            "Flow id={:?}, Generated filter expr: {:?}",
726            self.config.flow_id,
727            expr.as_ref()
728                .map(
729                    |expr| expr_to_sql(&expr.expr).with_context(|_| DatafusionSnafu {
730                        context: format!("Failed to generate filter expr from {expr:?}"),
731                    })
732                )
733                .transpose()?
734                .map(|s| s.to_string())
735        );
736
737        let Some(expr) = expr else {
738            // no new data, hence no need to update
739            debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
740            return Ok(None);
741        };
742
743        let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
744        let mut add_auto_column = ColumnMatcherRewriter::new(
745            sink_table_schema.clone(),
746            primary_key_indices.to_vec(),
747            allow_partial,
748        );
749
750        let plan =
751            sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;
752        let rewrite = plan
753            .clone()
754            .rewrite(&mut add_filter)
755            .and_then(|p| p.data.rewrite(&mut add_auto_column))
756            .with_context(|_| DatafusionSnafu {
757                context: format!("Failed to rewrite plan:\n {}\n", plan),
758            })?
759            .data;
760        // only apply optimize after complex rewrite is done
761        let new_plan = apply_df_optimizer(rewrite, &query_ctx).await?;
762
763        let info = PlanInfo {
764            plan: new_plan.clone(),
765            filter: Some(expr),
766        };
767
768        Ok(Some(info))
769    }
770}
771
772// auto created table have a auto added column `update_at`, and optional have a `AUTO_CREATED_PLACEHOLDER_TS_COL` column for time index placeholder if no timestamp column is specified
773// TODO(discord9): for now no default value is set for auto added column for compatibility reason with streaming mode, but this might change in favor of simpler code?
774fn create_table_with_expr(
775    plan: &LogicalPlan,
776    sink_table_name: &[String; 3],
777    query_type: &QueryType,
778) -> Result<CreateTableExpr, Error> {
779    let table_def = match query_type {
780        &QueryType::Sql => {
781            if let Some(def) = build_pk_from_aggr(plan)? {
782                def
783            } else {
784                build_by_sql_schema(plan)?
785            }
786        }
787        QueryType::Tql => {
788            // first try build from aggr, then from tql schema because tql query might not have aggr node
789            if let Some(table_def) = build_pk_from_aggr(plan)? {
790                table_def
791            } else {
792                build_by_tql_schema(plan)?
793            }
794        }
795    };
796    let first_time_stamp = table_def.ts_col;
797    let primary_keys = table_def.pks;
798
799    let mut column_schemas = Vec::new();
800    for field in plan.schema().fields() {
801        let name = field.name();
802        let ty = ConcreteDataType::from_arrow_type(field.data_type());
803        let col_schema = if first_time_stamp == Some(name.clone()) {
804            ColumnSchema::new(name, ty, false).with_time_index(true)
805        } else {
806            ColumnSchema::new(name, ty, true)
807        };
808
809        match query_type {
810            QueryType::Sql => {
811                column_schemas.push(col_schema);
812            }
813            QueryType::Tql => {
814                // if is val column, need to rename as val DOUBLE NULL
815                // if is tag column, need to cast type as STRING NULL
816                let is_tag_column = primary_keys.contains(name);
817                let is_val_column = !is_tag_column && first_time_stamp.as_ref() != Some(name);
818                if is_val_column {
819                    let col_schema =
820                        ColumnSchema::new(name, ConcreteDataType::float64_datatype(), true);
821                    column_schemas.push(col_schema);
822                } else if is_tag_column {
823                    let col_schema =
824                        ColumnSchema::new(name, ConcreteDataType::string_datatype(), true);
825                    column_schemas.push(col_schema);
826                } else {
827                    // time index column
828                    column_schemas.push(col_schema);
829                }
830            }
831        }
832    }
833
834    if query_type == &QueryType::Sql {
835        let update_at_schema = ColumnSchema::new(
836            AUTO_CREATED_UPDATE_AT_TS_COL,
837            ConcreteDataType::timestamp_millisecond_datatype(),
838            true,
839        );
840        column_schemas.push(update_at_schema);
841    }
842
843    let time_index = if let Some(time_index) = first_time_stamp {
844        time_index
845    } else {
846        column_schemas.push(
847            ColumnSchema::new(
848                AUTO_CREATED_PLACEHOLDER_TS_COL,
849                ConcreteDataType::timestamp_millisecond_datatype(),
850                false,
851            )
852            .with_time_index(true),
853        );
854        AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
855    };
856
857    let column_defs =
858        column_schemas_to_defs(column_schemas, &primary_keys).context(ConvertColumnSchemaSnafu)?;
859    Ok(CreateTableExpr {
860        catalog_name: sink_table_name[0].clone(),
861        schema_name: sink_table_name[1].clone(),
862        table_name: sink_table_name[2].clone(),
863        desc: "Auto created table by flow engine".to_string(),
864        column_defs,
865        time_index,
866        primary_keys,
867        create_if_not_exists: true,
868        table_options: Default::default(),
869        table_id: None,
870        engine: "mito".to_string(),
871    })
872}
873
874/// simply build by schema, return first timestamp column and no primary key
875fn build_by_sql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
876    let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
877        if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
878            Some(f.name().clone())
879        } else {
880            None
881        }
882    });
883    Ok(TableDef {
884        ts_col: first_time_stamp,
885        pks: vec![],
886    })
887}
888
889/// Return first timestamp column found in output schema and all string columns
890fn build_by_tql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
891    let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
892        if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
893            Some(f.name().clone())
894        } else {
895            None
896        }
897    });
898    let string_columns = plan
899        .schema()
900        .fields()
901        .iter()
902        .filter_map(|f| {
903            if ConcreteDataType::from_arrow_type(f.data_type()).is_string() {
904                Some(f.name().clone())
905            } else {
906                None
907            }
908        })
909        .collect::<Vec<_>>();
910
911    Ok(TableDef {
912        ts_col: first_time_stamp,
913        pks: string_columns,
914    })
915}
916
917struct TableDef {
918    ts_col: Option<String>,
919    pks: Vec<String>,
920}
921
922/// Return first timestamp column which is in group by clause and other columns which are also in group by clause
923///
924/// # Returns
925///
926/// * `Option<String>` - first timestamp column which is in group by clause
927/// * `Vec<String>` - other columns which are also in group by clause
928///
929/// if no aggregation found, return None
930fn build_pk_from_aggr(plan: &LogicalPlan) -> Result<Option<TableDef>, Error> {
931    let fields = plan.schema().fields();
932    let mut pk_names = FindGroupByFinalName::default();
933
934    plan.visit(&mut pk_names)
935        .with_context(|_| DatafusionSnafu {
936            context: format!("Can't find aggr expr in plan {plan:?}"),
937        })?;
938
939    // if no group by clause, return empty with first timestamp column found in output schema
940    let Some(pk_final_names) = pk_names.get_group_expr_names() else {
941        return Ok(None);
942    };
943    if pk_final_names.is_empty() {
944        let first_ts_col = fields
945            .iter()
946            .find(|f| ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp())
947            .map(|f| f.name().clone());
948        return Ok(Some(TableDef {
949            ts_col: first_ts_col,
950            pks: vec![],
951        }));
952    }
953
954    let all_pk_cols: Vec<_> = fields
955        .iter()
956        .filter(|f| pk_final_names.contains(f.name()))
957        .map(|f| f.name().clone())
958        .collect();
959    // auto create table use first timestamp column in group by clause as time index
960    let first_time_stamp = fields
961        .iter()
962        .find(|f| {
963            all_pk_cols.contains(&f.name().clone())
964                && ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()
965        })
966        .map(|f| f.name().clone());
967
968    let all_pk_cols: Vec<_> = all_pk_cols
969        .into_iter()
970        .filter(|col| first_time_stamp.as_ref() != Some(col))
971        .collect();
972
973    Ok(Some(TableDef {
974        ts_col: first_time_stamp,
975        pks: all_pk_cols,
976    }))
977}
978
979#[cfg(test)]
980mod test {
981    use api::v1::column_def::try_as_column_schema;
982    use pretty_assertions::assert_eq;
983    use session::context::QueryContext;
984
985    use super::*;
986    use crate::test_utils::create_test_query_engine;
987
988    #[tokio::test]
989    async fn test_gen_create_table_sql() {
990        let query_engine = create_test_query_engine();
991        let ctx = QueryContext::arc();
992        struct TestCase {
993            sql: String,
994            sink_table_name: String,
995            column_schemas: Vec<ColumnSchema>,
996            primary_keys: Vec<String>,
997            time_index: String,
998        }
999
1000        let update_at_schema = ColumnSchema::new(
1001            AUTO_CREATED_UPDATE_AT_TS_COL,
1002            ConcreteDataType::timestamp_millisecond_datatype(),
1003            true,
1004        );
1005
1006        let ts_placeholder_schema = ColumnSchema::new(
1007            AUTO_CREATED_PLACEHOLDER_TS_COL,
1008            ConcreteDataType::timestamp_millisecond_datatype(),
1009            false,
1010        )
1011        .with_time_index(true);
1012
1013        let testcases = vec![
1014            TestCase {
1015                sql: "SELECT number, ts FROM numbers_with_ts".to_string(),
1016                sink_table_name: "new_table".to_string(),
1017                column_schemas: vec![
1018                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1019                    ColumnSchema::new(
1020                        "ts",
1021                        ConcreteDataType::timestamp_millisecond_datatype(),
1022                        false,
1023                    )
1024                    .with_time_index(true),
1025                    update_at_schema.clone(),
1026                ],
1027                primary_keys: vec![],
1028                time_index: "ts".to_string(),
1029            },
1030            TestCase {
1031                sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(),
1032                sink_table_name: "new_table".to_string(),
1033                column_schemas: vec![
1034                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1035                    ColumnSchema::new(
1036                        "max(numbers_with_ts.ts)",
1037                        ConcreteDataType::timestamp_millisecond_datatype(),
1038                        true,
1039                    ),
1040                    update_at_schema.clone(),
1041                    ts_placeholder_schema.clone(),
1042                ],
1043                primary_keys: vec!["number".to_string()],
1044                time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
1045            },
1046            TestCase {
1047                sql: "SELECT max(number), ts FROM numbers_with_ts GROUP BY ts".to_string(),
1048                sink_table_name: "new_table".to_string(),
1049                column_schemas: vec![
1050                    ColumnSchema::new(
1051                        "max(numbers_with_ts.number)",
1052                        ConcreteDataType::uint32_datatype(),
1053                        true,
1054                    ),
1055                    ColumnSchema::new(
1056                        "ts",
1057                        ConcreteDataType::timestamp_millisecond_datatype(),
1058                        false,
1059                    )
1060                    .with_time_index(true),
1061                    update_at_schema.clone(),
1062                ],
1063                primary_keys: vec![],
1064                time_index: "ts".to_string(),
1065            },
1066            TestCase {
1067                sql: "SELECT number, ts FROM numbers_with_ts GROUP BY ts, number".to_string(),
1068                sink_table_name: "new_table".to_string(),
1069                column_schemas: vec![
1070                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1071                    ColumnSchema::new(
1072                        "ts",
1073                        ConcreteDataType::timestamp_millisecond_datatype(),
1074                        false,
1075                    )
1076                    .with_time_index(true),
1077                    update_at_schema.clone(),
1078                ],
1079                primary_keys: vec!["number".to_string()],
1080                time_index: "ts".to_string(),
1081            },
1082        ];
1083
1084        for tc in testcases {
1085            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &tc.sql, true)
1086                .await
1087                .unwrap();
1088            let expr = create_table_with_expr(
1089                &plan,
1090                &[
1091                    "greptime".to_string(),
1092                    "public".to_string(),
1093                    tc.sink_table_name.clone(),
1094                ],
1095                &QueryType::Sql,
1096            )
1097            .unwrap();
1098            // TODO(discord9): assert expr
1099            let column_schemas = expr
1100                .column_defs
1101                .iter()
1102                .map(|c| try_as_column_schema(c).unwrap())
1103                .collect::<Vec<_>>();
1104            assert_eq!(tc.column_schemas, column_schemas, "{:?}", tc.sql);
1105            assert_eq!(tc.primary_keys, expr.primary_keys, "{:?}", tc.sql);
1106            assert_eq!(tc.time_index, expr.time_index, "{:?}", tc.sql);
1107        }
1108    }
1109}