flow/batching_mode/
task.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18
19use api::v1::CreateTableExpr;
20use catalog::CatalogManagerRef;
21use common_error::ext::BoxedError;
22use common_query::logical_plan::breakup_insert_plan;
23use common_telemetry::tracing::warn;
24use common_telemetry::{debug, info};
25use common_time::Timestamp;
26use datafusion::datasource::DefaultTableSource;
27use datafusion::sql::unparser::expr_to_sql;
28use datafusion_common::DFSchemaRef;
29use datafusion_common::tree_node::{Transformed, TreeNode};
30use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
31use datatypes::prelude::ConcreteDataType;
32use datatypes::schema::{ColumnSchema, Schema};
33use operator::expr_helper::column_schemas_to_defs;
34use query::QueryEngineRef;
35use query::query_engine::DefaultSerializer;
36use session::context::QueryContextRef;
37use snafu::{OptionExt, ResultExt, ensure};
38use sql::parser::{ParseOptions, ParserContext};
39use sql::statements::statement::Statement;
40use store_api::mito_engine_options::MERGE_MODE_KEY;
41use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
42use table::table::adapter::DfTableProviderAdapter;
43use tokio::sync::oneshot;
44use tokio::sync::oneshot::error::TryRecvError;
45use tokio::time::Instant;
46
47use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
48use crate::batching_mode::BatchingModeOptions;
49use crate::batching_mode::frontend_client::FrontendClient;
50use crate::batching_mode::state::{FilterExprInfo, TaskState};
51use crate::batching_mode::time_window::TimeWindowExpr;
52use crate::batching_mode::utils::{
53    AddFilterRewriter, ColumnMatcherRewriter, FindGroupByFinalName, gen_plan_with_matching_schema,
54    get_table_info_df_schema, sql_to_df_plan,
55};
56use crate::df_optimizer::apply_df_optimizer;
57use crate::error::{
58    ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
59    SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
60};
61use crate::metrics::{
62    METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
63    METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
64    METRIC_FLOW_ROWS,
65};
66use crate::{Error, FlowId};
67
68/// The task's config, immutable once created
69#[derive(Clone)]
70pub struct TaskConfig {
71    pub flow_id: FlowId,
72    pub query: String,
73    /// output schema of the query
74    pub output_schema: DFSchemaRef,
75    pub time_window_expr: Option<TimeWindowExpr>,
76    /// in seconds
77    pub expire_after: Option<i64>,
78    pub sink_table_name: [String; 3],
79    pub source_table_names: HashSet<[String; 3]>,
80    pub catalog_manager: CatalogManagerRef,
81    pub query_type: QueryType,
82    pub batch_opts: Arc<BatchingModeOptions>,
83    pub flow_eval_interval: Option<Duration>,
84}
85
86fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
87    let stmts =
88        ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
89            .map_err(BoxedError::new)
90            .context(ExternalSnafu)?;
91
92    ensure!(
93        stmts.len() == 1,
94        InvalidQuerySnafu {
95            reason: format!("Expect only one statement, found {}", stmts.len())
96        }
97    );
98    let stmt = &stmts[0];
99    match stmt {
100        Statement::Tql(_) => Ok(QueryType::Tql),
101        _ => Ok(QueryType::Sql),
102    }
103}
104
105fn is_merge_mode_last_non_null(options: &HashMap<String, String>) -> bool {
106    options
107        .get(MERGE_MODE_KEY)
108        .map(|mode| mode.eq_ignore_ascii_case("last_non_null"))
109        .unwrap_or(false)
110}
111
112#[derive(Debug, Clone, PartialEq, Eq)]
113pub enum QueryType {
114    /// query is a tql query
115    Tql,
116    /// query is a sql query
117    Sql,
118}
119
120#[derive(Clone)]
121pub struct BatchingTask {
122    pub config: Arc<TaskConfig>,
123    pub state: Arc<RwLock<TaskState>>,
124}
125
126/// Arguments for creating batching task
127pub struct TaskArgs<'a> {
128    pub flow_id: FlowId,
129    pub query: &'a str,
130    pub plan: LogicalPlan,
131    pub time_window_expr: Option<TimeWindowExpr>,
132    pub expire_after: Option<i64>,
133    pub sink_table_name: [String; 3],
134    pub source_table_names: Vec<[String; 3]>,
135    pub query_ctx: QueryContextRef,
136    pub catalog_manager: CatalogManagerRef,
137    pub shutdown_rx: oneshot::Receiver<()>,
138    pub batch_opts: Arc<BatchingModeOptions>,
139    pub flow_eval_interval: Option<Duration>,
140}
141
142pub struct PlanInfo {
143    pub plan: LogicalPlan,
144    pub filter: Option<FilterExprInfo>,
145}
146
147impl BatchingTask {
148    #[allow(clippy::too_many_arguments)]
149    pub fn try_new(
150        TaskArgs {
151            flow_id,
152            query,
153            plan,
154            time_window_expr,
155            expire_after,
156            sink_table_name,
157            source_table_names,
158            query_ctx,
159            catalog_manager,
160            shutdown_rx,
161            batch_opts,
162            flow_eval_interval,
163        }: TaskArgs<'_>,
164    ) -> Result<Self, Error> {
165        Ok(Self {
166            config: Arc::new(TaskConfig {
167                flow_id,
168                query: query.to_string(),
169                time_window_expr,
170                expire_after,
171                sink_table_name,
172                source_table_names: source_table_names.into_iter().collect(),
173                catalog_manager,
174                output_schema: plan.schema().clone(),
175                query_type: determine_query_type(query, &query_ctx)?,
176                batch_opts,
177                flow_eval_interval,
178            }),
179            state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
180        })
181    }
182
183    /// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
184    ///
185    /// useful for flush_flow to flush dirty time windows range
186    pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
187        let now = SystemTime::now();
188        let now = Timestamp::new_second(
189            now.duration_since(UNIX_EPOCH)
190                .expect("Time went backwards")
191                .as_secs() as _,
192        );
193        let lower_bound = self
194            .config
195            .expire_after
196            .map(|e| now.sub_duration(Duration::from_secs(e as _)))
197            .transpose()
198            .map_err(BoxedError::new)
199            .context(ExternalSnafu)?
200            .unwrap_or(Timestamp::new_second(0));
201        debug!(
202            "Flow {} mark range ({:?}, {:?}) as dirty",
203            self.config.flow_id, lower_bound, now
204        );
205        self.state
206            .write()
207            .unwrap()
208            .dirty_time_windows
209            .add_window(lower_bound, Some(now));
210        Ok(())
211    }
212
213    /// Create sink table if not exists
214    pub async fn check_or_create_sink_table(
215        &self,
216        engine: &QueryEngineRef,
217        frontend_client: &Arc<FrontendClient>,
218    ) -> Result<Option<(u32, Duration)>, Error> {
219        if !self.is_table_exist(&self.config.sink_table_name).await? {
220            let create_table = self.gen_create_table_expr(engine.clone()).await?;
221            info!(
222                "Try creating sink table(if not exists) with expr: {:?}",
223                create_table
224            );
225            self.create_table(frontend_client, create_table).await?;
226            info!(
227                "Sink table {}(if not exists) created",
228                self.config.sink_table_name.join(".")
229            );
230        }
231
232        Ok(None)
233    }
234
235    async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
236        self.config
237            .catalog_manager
238            .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
239            .await
240            .map_err(BoxedError::new)
241            .context(ExternalSnafu)
242    }
243
244    pub async fn gen_exec_once(
245        &self,
246        engine: &QueryEngineRef,
247        frontend_client: &Arc<FrontendClient>,
248        max_window_cnt: Option<usize>,
249    ) -> Result<Option<(u32, Duration)>, Error> {
250        if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? {
251            debug!("Generate new query: {}", new_query.plan);
252            self.execute_logical_plan(frontend_client, &new_query.plan)
253                .await
254        } else {
255            debug!("Generate no query");
256            Ok(None)
257        }
258    }
259
260    pub async fn gen_insert_plan(
261        &self,
262        engine: &QueryEngineRef,
263        max_window_cnt: Option<usize>,
264    ) -> Result<Option<PlanInfo>, Error> {
265        let (table, df_schema) = get_table_info_df_schema(
266            self.config.catalog_manager.clone(),
267            self.config.sink_table_name.clone(),
268        )
269        .await?;
270
271        let table_meta = &table.table_info().meta;
272        let merge_mode_last_non_null =
273            is_merge_mode_last_non_null(&table_meta.options.extra_options);
274        let primary_key_indices = table_meta.primary_key_indices.clone();
275
276        let new_query = self
277            .gen_query_with_time_window(
278                engine.clone(),
279                &table.table_info().meta.schema,
280                &primary_key_indices,
281                merge_mode_last_non_null,
282                max_window_cnt,
283            )
284            .await?;
285
286        let insert_into_info = if let Some(new_query) = new_query {
287            // first check if all columns in input query exists in sink table
288            // since insert into ref to names in record batch generate by given query
289            let table_columns = df_schema
290                .columns()
291                .into_iter()
292                .map(|c| c.name)
293                .collect::<BTreeSet<_>>();
294            for column in new_query.plan.schema().columns() {
295                ensure!(
296                    table_columns.contains(column.name()),
297                    InvalidQuerySnafu {
298                        reason: format!(
299                            "Column {} not found in sink table with columns {:?}",
300                            column, table_columns
301                        ),
302                    }
303                );
304            }
305
306            let table_provider = Arc::new(DfTableProviderAdapter::new(table));
307            let table_source = Arc::new(DefaultTableSource::new(table_provider));
308
309            // update_at& time index placeholder (if exists) should have default value
310            let plan = LogicalPlan::Dml(DmlStatement::new(
311                datafusion_common::TableReference::Full {
312                    catalog: self.config.sink_table_name[0].clone().into(),
313                    schema: self.config.sink_table_name[1].clone().into(),
314                    table: self.config.sink_table_name[2].clone().into(),
315                },
316                table_source,
317                WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
318                Arc::new(new_query.plan),
319            ));
320            PlanInfo {
321                plan,
322                filter: new_query.filter,
323            }
324        } else {
325            return Ok(None);
326        };
327        let insert_into = insert_into_info
328            .plan
329            .recompute_schema()
330            .context(DatafusionSnafu {
331                context: "Failed to recompute schema",
332            })?;
333
334        Ok(Some(PlanInfo {
335            plan: insert_into,
336            filter: insert_into_info.filter,
337        }))
338    }
339
340    pub async fn create_table(
341        &self,
342        frontend_client: &Arc<FrontendClient>,
343        expr: CreateTableExpr,
344    ) -> Result<(), Error> {
345        let catalog = &self.config.sink_table_name[0];
346        let schema = &self.config.sink_table_name[1];
347        frontend_client
348            .create(expr.clone(), catalog, schema)
349            .await?;
350        Ok(())
351    }
352
353    pub async fn execute_logical_plan(
354        &self,
355        frontend_client: &Arc<FrontendClient>,
356        plan: &LogicalPlan,
357    ) -> Result<Option<(u32, Duration)>, Error> {
358        let instant = Instant::now();
359        let flow_id = self.config.flow_id;
360
361        debug!(
362            "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
363            self.config.expire_after, &plan
364        );
365
366        let catalog = &self.config.sink_table_name[0];
367        let schema = &self.config.sink_table_name[1];
368
369        // fix all table ref by make it fully qualified, i.e. "table_name" => "catalog_name.schema_name.table_name"
370        let plan = plan
371            .clone()
372            .transform_down_with_subqueries(|p| {
373                if let LogicalPlan::TableScan(mut table_scan) = p {
374                    let resolved = table_scan.table_name.resolve(catalog, schema);
375                    table_scan.table_name = resolved.into();
376                    Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
377                } else {
378                    Ok(Transformed::no(p))
379                }
380            })
381            .with_context(|_| DatafusionSnafu {
382                context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
383            })?
384            .data;
385
386        let mut peer_desc = None;
387
388        let res = {
389            let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
390                .with_label_values(&[flow_id.to_string().as_str()])
391                .start_timer();
392
393            // hack and special handling the insert logical plan
394            let req = if let Some((insert_to, insert_plan)) =
395                breakup_insert_plan(&plan, catalog, schema)
396            {
397                let message = DFLogicalSubstraitConvertor {}
398                    .encode(&insert_plan, DefaultSerializer)
399                    .context(SubstraitEncodeLogicalPlanSnafu)?;
400                api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
401                    query: Some(api::v1::query_request::Query::InsertIntoPlan(
402                        api::v1::InsertIntoPlan {
403                            table_name: Some(insert_to),
404                            logical_plan: message.to_vec(),
405                        },
406                    )),
407                })
408            } else {
409                let message = DFLogicalSubstraitConvertor {}
410                    .encode(&plan, DefaultSerializer)
411                    .context(SubstraitEncodeLogicalPlanSnafu)?;
412
413                api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
414                    query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
415                })
416            };
417
418            frontend_client
419                .handle(req, catalog, schema, &mut peer_desc)
420                .await
421        };
422
423        let elapsed = instant.elapsed();
424        if let Ok(affected_rows) = &res {
425            debug!(
426                "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
427                elapsed
428            );
429            METRIC_FLOW_ROWS
430                .with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
431                .inc_by(*affected_rows as _);
432        } else if let Err(err) = &res {
433            warn!(
434                "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
435                peer_desc, elapsed, &plan
436            );
437        }
438
439        // record slow query
440        if elapsed >= self.config.batch_opts.slow_query_threshold {
441            warn!(
442                "Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}",
443                peer_desc, elapsed, &plan
444            );
445            METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
446                .with_label_values(&[
447                    flow_id.to_string().as_str(),
448                    &peer_desc.unwrap_or_default().to_string(),
449                ])
450                .observe(elapsed.as_secs_f64());
451        }
452
453        self.state
454            .write()
455            .unwrap()
456            .after_query_exec(elapsed, res.is_ok());
457
458        let res = res?;
459
460        Ok(Some((res, elapsed)))
461    }
462
463    /// start executing query in a loop, break when receive shutdown signal
464    ///
465    /// any error will be logged when executing query
466    pub async fn start_executing_loop(
467        &self,
468        engine: QueryEngineRef,
469        frontend_client: Arc<FrontendClient>,
470    ) {
471        let flow_id_str = self.config.flow_id.to_string();
472        let mut max_window_cnt = None;
473        let mut interval = self
474            .config
475            .flow_eval_interval
476            .map(|d| tokio::time::interval(d));
477        if let Some(tick) = &mut interval {
478            tick.tick().await; // pass the first tick immediately
479        }
480        loop {
481            // first check if shutdown signal is received
482            // if so, break the loop
483            {
484                let mut state = self.state.write().unwrap();
485                match state.shutdown_rx.try_recv() {
486                    Ok(()) => break,
487                    Err(TryRecvError::Closed) => {
488                        warn!(
489                            "Unexpected shutdown flow {}, shutdown anyway",
490                            self.config.flow_id
491                        );
492                        break;
493                    }
494                    Err(TryRecvError::Empty) => (),
495                }
496            }
497            METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
498                .with_label_values(&[&flow_id_str])
499                .inc();
500
501            let min_refresh = self.config.batch_opts.experimental_min_refresh_duration;
502
503            let new_query = match self.gen_insert_plan(&engine, max_window_cnt).await {
504                Ok(new_query) => new_query,
505                Err(err) => {
506                    common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
507                    // also sleep for a little while before try again to prevent flooding logs
508                    tokio::time::sleep(min_refresh).await;
509                    continue;
510                }
511            };
512
513            let res = if let Some(new_query) = &new_query {
514                self.execute_logical_plan(&frontend_client, &new_query.plan)
515                    .await
516            } else {
517                Ok(None)
518            };
519
520            match res {
521                // normal execute, sleep for some time before doing next query
522                Ok(Some(_)) => {
523                    // can increase max_window_cnt to query more windows next time
524                    max_window_cnt = max_window_cnt.map(|cnt| {
525                        (cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query)
526                    });
527
528                    // here use proper ticking if set eval interval
529                    if let Some(eval_interval) = &mut interval {
530                        eval_interval.tick().await;
531                    } else {
532                        // if not explicitly set, just automatically calculate next start time
533                        // using time window size and more args
534                        let sleep_until = {
535                            let state = self.state.write().unwrap();
536
537                            let time_window_size = self
538                                .config
539                                .time_window_expr
540                                .as_ref()
541                                .and_then(|t| *t.time_window_size());
542
543                            state.get_next_start_query_time(
544                                self.config.flow_id,
545                                &time_window_size,
546                                min_refresh,
547                                Some(self.config.batch_opts.query_timeout),
548                                self.config.batch_opts.experimental_max_filter_num_per_query,
549                            )
550                        };
551
552                        tokio::time::sleep_until(sleep_until).await;
553                    };
554                }
555                // no new data, sleep for some time before checking for new data
556                Ok(None) => {
557                    debug!(
558                        "Flow id = {:?} found no new data, sleep for {:?} then continue",
559                        self.config.flow_id, min_refresh
560                    );
561                    tokio::time::sleep(min_refresh).await;
562                    continue;
563                }
564                // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
565                Err(err) => {
566                    METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
567                        .with_label_values(&[&flow_id_str])
568                        .inc();
569                    match new_query {
570                        Some(query) => {
571                            common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
572                            // Re-add dirty windows back since query failed
573                            self.state.write().unwrap().dirty_time_windows.add_windows(
574                                query.filter.map(|f| f.time_ranges).unwrap_or_default(),
575                            );
576                            // TODO(discord9): add some backoff here? half the query time window or what
577                            // backoff meaning use smaller `max_window_cnt` for next query
578
579                            // since last query failed, we should not try to query too many windows
580                            max_window_cnt = Some(1);
581                        }
582                        None => {
583                            common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
584                        }
585                    }
586                    // also sleep for a little while before try again to prevent flooding logs
587                    tokio::time::sleep(min_refresh).await;
588                }
589            }
590        }
591    }
592
593    /// Generate the create table SQL
594    ///
595    /// the auto created table will automatically added a `update_at` Milliseconds DEFAULT now() column in the end
596    /// (for compatibility with flow streaming mode)
597    ///
598    /// and it will use first timestamp column as time index, all other columns will be added as normal columns and nullable
599    async fn gen_create_table_expr(
600        &self,
601        engine: QueryEngineRef,
602    ) -> Result<CreateTableExpr, Error> {
603        let query_ctx = self.state.read().unwrap().query_ctx.clone();
604        let plan =
605            sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
606        create_table_with_expr(&plan, &self.config.sink_table_name, &self.config.query_type)
607    }
608
609    /// will merge and use the first ten time window in query
610    async fn gen_query_with_time_window(
611        &self,
612        engine: QueryEngineRef,
613        sink_table_schema: &Arc<Schema>,
614        primary_key_indices: &[usize],
615        allow_partial: bool,
616        max_window_cnt: Option<usize>,
617    ) -> Result<Option<PlanInfo>, Error> {
618        let query_ctx = self.state.read().unwrap().query_ctx.clone();
619        let start = SystemTime::now();
620        let since_the_epoch = start
621            .duration_since(UNIX_EPOCH)
622            .expect("Time went backwards");
623        let low_bound = self
624            .config
625            .expire_after
626            .map(|e| since_the_epoch.as_secs() - e as u64)
627            .unwrap_or(u64::MIN);
628
629        let low_bound = Timestamp::new_second(low_bound as i64);
630
631        let expire_time_window_bound = self
632            .config
633            .time_window_expr
634            .as_ref()
635            .map(|expr| expr.eval(low_bound))
636            .transpose()?;
637
638        let (expire_lower_bound, expire_upper_bound) =
639            match (expire_time_window_bound, &self.config.query_type) {
640                (Some((Some(l), Some(u))), QueryType::Sql) => (l, u),
641                (None, QueryType::Sql) => {
642                    // if it's sql query and no time window lower/upper bound is found, just return the original query(with auto columns)
643                    // use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
644                    debug!(
645                        "Flow id = {:?}, no time window, using the same query",
646                        self.config.flow_id
647                    );
648                    // clean dirty time window too, this could be from create flow's check_execute
649                    let is_dirty = !self.state.read().unwrap().dirty_time_windows.is_empty();
650                    self.state.write().unwrap().dirty_time_windows.clean();
651
652                    if !is_dirty {
653                        // no dirty data, hence no need to update
654                        debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
655                        return Ok(None);
656                    }
657
658                    let plan = gen_plan_with_matching_schema(
659                        &self.config.query,
660                        query_ctx,
661                        engine,
662                        sink_table_schema.clone(),
663                        primary_key_indices,
664                        allow_partial,
665                    )
666                    .await?;
667
668                    return Ok(Some(PlanInfo { plan, filter: None }));
669                }
670                _ => {
671                    // clean for tql have no use for time window
672                    self.state.write().unwrap().dirty_time_windows.clean();
673
674                    let plan = gen_plan_with_matching_schema(
675                        &self.config.query,
676                        query_ctx,
677                        engine,
678                        sink_table_schema.clone(),
679                        primary_key_indices,
680                        allow_partial,
681                    )
682                    .await?;
683
684                    return Ok(Some(PlanInfo { plan, filter: None }));
685                }
686            };
687
688        debug!(
689            "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
690            self.config.flow_id,
691            expire_lower_bound,
692            expire_upper_bound,
693            self.state.read().unwrap().dirty_time_windows
694        );
695        let window_size = expire_upper_bound
696            .sub(&expire_lower_bound)
697            .with_context(|| UnexpectedSnafu {
698                reason: format!(
699                    "Can't get window size from {expire_upper_bound:?} - {expire_lower_bound:?}"
700                ),
701            })?;
702        let col_name = self
703            .config
704            .time_window_expr
705            .as_ref()
706            .map(|expr| expr.column_name.clone())
707            .with_context(|| UnexpectedSnafu {
708                reason: format!(
709                    "Flow id={:?}, Failed to get column name from time window expr",
710                    self.config.flow_id
711                ),
712            })?;
713
714        let expr = self
715            .state
716            .write()
717            .unwrap()
718            .dirty_time_windows
719            .gen_filter_exprs(
720                &col_name,
721                Some(expire_lower_bound),
722                window_size,
723                max_window_cnt
724                    .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query),
725                self.config.flow_id,
726                Some(self),
727            )?;
728
729        debug!(
730            "Flow id={:?}, Generated filter expr: {:?}",
731            self.config.flow_id,
732            expr.as_ref()
733                .map(
734                    |expr| expr_to_sql(&expr.expr).with_context(|_| DatafusionSnafu {
735                        context: format!("Failed to generate filter expr from {expr:?}"),
736                    })
737                )
738                .transpose()?
739                .map(|s| s.to_string())
740        );
741
742        let Some(expr) = expr else {
743            // no new data, hence no need to update
744            debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
745            return Ok(None);
746        };
747
748        let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
749        let mut add_auto_column = ColumnMatcherRewriter::new(
750            sink_table_schema.clone(),
751            primary_key_indices.to_vec(),
752            allow_partial,
753        );
754
755        let plan =
756            sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;
757        let rewrite = plan
758            .clone()
759            .rewrite(&mut add_filter)
760            .and_then(|p| p.data.rewrite(&mut add_auto_column))
761            .with_context(|_| DatafusionSnafu {
762                context: format!("Failed to rewrite plan:\n {}\n", plan),
763            })?
764            .data;
765        // only apply optimize after complex rewrite is done
766        let new_plan = apply_df_optimizer(rewrite, &query_ctx).await?;
767
768        let info = PlanInfo {
769            plan: new_plan.clone(),
770            filter: Some(expr),
771        };
772
773        Ok(Some(info))
774    }
775}
776
777// auto created table have a auto added column `update_at`, and optional have a `AUTO_CREATED_PLACEHOLDER_TS_COL` column for time index placeholder if no timestamp column is specified
778// TODO(discord9): for now no default value is set for auto added column for compatibility reason with streaming mode, but this might change in favor of simpler code?
779fn create_table_with_expr(
780    plan: &LogicalPlan,
781    sink_table_name: &[String; 3],
782    query_type: &QueryType,
783) -> Result<CreateTableExpr, Error> {
784    let table_def = match query_type {
785        &QueryType::Sql => {
786            if let Some(def) = build_pk_from_aggr(plan)? {
787                def
788            } else {
789                build_by_sql_schema(plan)?
790            }
791        }
792        QueryType::Tql => {
793            // first try build from aggr, then from tql schema because tql query might not have aggr node
794            if let Some(table_def) = build_pk_from_aggr(plan)? {
795                table_def
796            } else {
797                build_by_tql_schema(plan)?
798            }
799        }
800    };
801    let first_time_stamp = table_def.ts_col;
802    let primary_keys = table_def.pks;
803
804    let mut column_schemas = Vec::new();
805    for field in plan.schema().fields() {
806        let name = field.name();
807        let ty = ConcreteDataType::from_arrow_type(field.data_type());
808        let col_schema = if first_time_stamp == Some(name.clone()) {
809            ColumnSchema::new(name, ty, false).with_time_index(true)
810        } else {
811            ColumnSchema::new(name, ty, true)
812        };
813
814        match query_type {
815            QueryType::Sql => {
816                column_schemas.push(col_schema);
817            }
818            QueryType::Tql => {
819                // if is val column, need to rename as val DOUBLE NULL
820                // if is tag column, need to cast type as STRING NULL
821                let is_tag_column = primary_keys.contains(name);
822                let is_val_column = !is_tag_column && first_time_stamp.as_ref() != Some(name);
823                if is_val_column {
824                    let col_schema =
825                        ColumnSchema::new(name, ConcreteDataType::float64_datatype(), true);
826                    column_schemas.push(col_schema);
827                } else if is_tag_column {
828                    let col_schema =
829                        ColumnSchema::new(name, ConcreteDataType::string_datatype(), true);
830                    column_schemas.push(col_schema);
831                } else {
832                    // time index column
833                    column_schemas.push(col_schema);
834                }
835            }
836        }
837    }
838
839    if query_type == &QueryType::Sql {
840        let update_at_schema = ColumnSchema::new(
841            AUTO_CREATED_UPDATE_AT_TS_COL,
842            ConcreteDataType::timestamp_millisecond_datatype(),
843            true,
844        );
845        column_schemas.push(update_at_schema);
846    }
847
848    let time_index = if let Some(time_index) = first_time_stamp {
849        time_index
850    } else {
851        column_schemas.push(
852            ColumnSchema::new(
853                AUTO_CREATED_PLACEHOLDER_TS_COL,
854                ConcreteDataType::timestamp_millisecond_datatype(),
855                false,
856            )
857            .with_time_index(true),
858        );
859        AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
860    };
861
862    let column_defs =
863        column_schemas_to_defs(column_schemas, &primary_keys).context(ConvertColumnSchemaSnafu)?;
864    Ok(CreateTableExpr {
865        catalog_name: sink_table_name[0].clone(),
866        schema_name: sink_table_name[1].clone(),
867        table_name: sink_table_name[2].clone(),
868        desc: "Auto created table by flow engine".to_string(),
869        column_defs,
870        time_index,
871        primary_keys,
872        create_if_not_exists: true,
873        table_options: Default::default(),
874        table_id: None,
875        engine: "mito".to_string(),
876    })
877}
878
879/// simply build by schema, return first timestamp column and no primary key
880fn build_by_sql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
881    let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
882        if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
883            Some(f.name().clone())
884        } else {
885            None
886        }
887    });
888    Ok(TableDef {
889        ts_col: first_time_stamp,
890        pks: vec![],
891    })
892}
893
894/// Return first timestamp column found in output schema and all string columns
895fn build_by_tql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
896    let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
897        if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
898            Some(f.name().clone())
899        } else {
900            None
901        }
902    });
903    let string_columns = plan
904        .schema()
905        .fields()
906        .iter()
907        .filter_map(|f| {
908            if ConcreteDataType::from_arrow_type(f.data_type()).is_string() {
909                Some(f.name().clone())
910            } else {
911                None
912            }
913        })
914        .collect::<Vec<_>>();
915
916    Ok(TableDef {
917        ts_col: first_time_stamp,
918        pks: string_columns,
919    })
920}
921
922struct TableDef {
923    ts_col: Option<String>,
924    pks: Vec<String>,
925}
926
927/// Return first timestamp column which is in group by clause and other columns which are also in group by clause
928///
929/// # Returns
930///
931/// * `Option<String>` - first timestamp column which is in group by clause
932/// * `Vec<String>` - other columns which are also in group by clause
933///
934/// if no aggregation found, return None
935fn build_pk_from_aggr(plan: &LogicalPlan) -> Result<Option<TableDef>, Error> {
936    let fields = plan.schema().fields();
937    let mut pk_names = FindGroupByFinalName::default();
938
939    plan.visit(&mut pk_names)
940        .with_context(|_| DatafusionSnafu {
941            context: format!("Can't find aggr expr in plan {plan:?}"),
942        })?;
943
944    // if no group by clause, return empty with first timestamp column found in output schema
945    let Some(pk_final_names) = pk_names.get_group_expr_names() else {
946        return Ok(None);
947    };
948    if pk_final_names.is_empty() {
949        let first_ts_col = fields
950            .iter()
951            .find(|f| ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp())
952            .map(|f| f.name().clone());
953        return Ok(Some(TableDef {
954            ts_col: first_ts_col,
955            pks: vec![],
956        }));
957    }
958
959    let all_pk_cols: Vec<_> = fields
960        .iter()
961        .filter(|f| pk_final_names.contains(f.name()))
962        .map(|f| f.name().clone())
963        .collect();
964    // auto create table use first timestamp column in group by clause as time index
965    let first_time_stamp = fields
966        .iter()
967        .find(|f| {
968            all_pk_cols.contains(&f.name().clone())
969                && ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()
970        })
971        .map(|f| f.name().clone());
972
973    let all_pk_cols: Vec<_> = all_pk_cols
974        .into_iter()
975        .filter(|col| first_time_stamp.as_ref() != Some(col))
976        .collect();
977
978    Ok(Some(TableDef {
979        ts_col: first_time_stamp,
980        pks: all_pk_cols,
981    }))
982}
983
984#[cfg(test)]
985mod test {
986    use api::v1::column_def::try_as_column_schema;
987    use pretty_assertions::assert_eq;
988    use session::context::QueryContext;
989
990    use super::*;
991    use crate::test_utils::create_test_query_engine;
992
993    #[tokio::test]
994    async fn test_gen_create_table_sql() {
995        let query_engine = create_test_query_engine();
996        let ctx = QueryContext::arc();
997        struct TestCase {
998            sql: String,
999            sink_table_name: String,
1000            column_schemas: Vec<ColumnSchema>,
1001            primary_keys: Vec<String>,
1002            time_index: String,
1003        }
1004
1005        let update_at_schema = ColumnSchema::new(
1006            AUTO_CREATED_UPDATE_AT_TS_COL,
1007            ConcreteDataType::timestamp_millisecond_datatype(),
1008            true,
1009        );
1010
1011        let ts_placeholder_schema = ColumnSchema::new(
1012            AUTO_CREATED_PLACEHOLDER_TS_COL,
1013            ConcreteDataType::timestamp_millisecond_datatype(),
1014            false,
1015        )
1016        .with_time_index(true);
1017
1018        let testcases = vec![
1019            TestCase {
1020                sql: "SELECT number, ts FROM numbers_with_ts".to_string(),
1021                sink_table_name: "new_table".to_string(),
1022                column_schemas: vec![
1023                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1024                    ColumnSchema::new(
1025                        "ts",
1026                        ConcreteDataType::timestamp_millisecond_datatype(),
1027                        false,
1028                    )
1029                    .with_time_index(true),
1030                    update_at_schema.clone(),
1031                ],
1032                primary_keys: vec![],
1033                time_index: "ts".to_string(),
1034            },
1035            TestCase {
1036                sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(),
1037                sink_table_name: "new_table".to_string(),
1038                column_schemas: vec![
1039                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1040                    ColumnSchema::new(
1041                        "max(numbers_with_ts.ts)",
1042                        ConcreteDataType::timestamp_millisecond_datatype(),
1043                        true,
1044                    ),
1045                    update_at_schema.clone(),
1046                    ts_placeholder_schema.clone(),
1047                ],
1048                primary_keys: vec!["number".to_string()],
1049                time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
1050            },
1051            TestCase {
1052                sql: "SELECT max(number), ts FROM numbers_with_ts GROUP BY ts".to_string(),
1053                sink_table_name: "new_table".to_string(),
1054                column_schemas: vec![
1055                    ColumnSchema::new(
1056                        "max(numbers_with_ts.number)",
1057                        ConcreteDataType::uint32_datatype(),
1058                        true,
1059                    ),
1060                    ColumnSchema::new(
1061                        "ts",
1062                        ConcreteDataType::timestamp_millisecond_datatype(),
1063                        false,
1064                    )
1065                    .with_time_index(true),
1066                    update_at_schema.clone(),
1067                ],
1068                primary_keys: vec![],
1069                time_index: "ts".to_string(),
1070            },
1071            TestCase {
1072                sql: "SELECT number, ts FROM numbers_with_ts GROUP BY ts, number".to_string(),
1073                sink_table_name: "new_table".to_string(),
1074                column_schemas: vec![
1075                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1076                    ColumnSchema::new(
1077                        "ts",
1078                        ConcreteDataType::timestamp_millisecond_datatype(),
1079                        false,
1080                    )
1081                    .with_time_index(true),
1082                    update_at_schema.clone(),
1083                ],
1084                primary_keys: vec!["number".to_string()],
1085                time_index: "ts".to_string(),
1086            },
1087        ];
1088
1089        for tc in testcases {
1090            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &tc.sql, true)
1091                .await
1092                .unwrap();
1093            let expr = create_table_with_expr(
1094                &plan,
1095                &[
1096                    "greptime".to_string(),
1097                    "public".to_string(),
1098                    tc.sink_table_name.clone(),
1099                ],
1100                &QueryType::Sql,
1101            )
1102            .unwrap();
1103            // TODO(discord9): assert expr
1104            let column_schemas = expr
1105                .column_defs
1106                .iter()
1107                .map(|c| try_as_column_schema(c).unwrap())
1108                .collect::<Vec<_>>();
1109            assert_eq!(tc.column_schemas, column_schemas, "{:?}", tc.sql);
1110            assert_eq!(tc.primary_keys, expr.primary_keys, "{:?}", tc.sql);
1111            assert_eq!(tc.time_index, expr.time_index, "{:?}", tc.sql);
1112        }
1113    }
1114}