flow/batching_mode/
task.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18
19use api::v1::CreateTableExpr;
20use catalog::CatalogManagerRef;
21use common_error::ext::BoxedError;
22use common_query::logical_plan::breakup_insert_plan;
23use common_telemetry::tracing::warn;
24use common_telemetry::{debug, info};
25use common_time::Timestamp;
26use datafusion::datasource::DefaultTableSource;
27use datafusion::sql::unparser::expr_to_sql;
28use datafusion_common::DFSchemaRef;
29use datafusion_common::tree_node::{Transformed, TreeNode};
30use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
31use datatypes::prelude::ConcreteDataType;
32use datatypes::schema::{ColumnSchema, Schema};
33use operator::expr_helper::column_schemas_to_defs;
34use query::QueryEngineRef;
35use query::query_engine::DefaultSerializer;
36use session::context::QueryContextRef;
37use snafu::{OptionExt, ResultExt, ensure};
38use sql::parser::{ParseOptions, ParserContext};
39use sql::statements::statement::Statement;
40use store_api::mito_engine_options::MERGE_MODE_KEY;
41use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
42use table::table::adapter::DfTableProviderAdapter;
43use tokio::sync::oneshot;
44use tokio::sync::oneshot::error::TryRecvError;
45use tokio::time::Instant;
46
47use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
48use crate::batching_mode::BatchingModeOptions;
49use crate::batching_mode::frontend_client::FrontendClient;
50use crate::batching_mode::state::{FilterExprInfo, TaskState};
51use crate::batching_mode::time_window::TimeWindowExpr;
52use crate::batching_mode::utils::{
53    AddFilterRewriter, ColumnMatcherRewriter, FindGroupByFinalName, gen_plan_with_matching_schema,
54    get_table_info_df_schema, sql_to_df_plan,
55};
56use crate::df_optimizer::apply_df_optimizer;
57use crate::error::{
58    ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
59    SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
60};
61use crate::metrics::{
62    METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
63    METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
64    METRIC_FLOW_ROWS,
65};
66use crate::{Error, FlowId};
67
68/// The task's config, immutable once created
69#[derive(Clone)]
70pub struct TaskConfig {
71    pub flow_id: FlowId,
72    pub query: String,
73    /// output schema of the query
74    pub output_schema: DFSchemaRef,
75    pub time_window_expr: Option<TimeWindowExpr>,
76    /// in seconds
77    pub expire_after: Option<i64>,
78    pub sink_table_name: [String; 3],
79    pub source_table_names: HashSet<[String; 3]>,
80    pub catalog_manager: CatalogManagerRef,
81    pub query_type: QueryType,
82    pub batch_opts: Arc<BatchingModeOptions>,
83    pub flow_eval_interval: Option<Duration>,
84}
85
86fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
87    let stmts =
88        ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
89            .map_err(BoxedError::new)
90            .context(ExternalSnafu)?;
91
92    ensure!(
93        stmts.len() == 1,
94        InvalidQuerySnafu {
95            reason: format!("Expect only one statement, found {}", stmts.len())
96        }
97    );
98    let stmt = &stmts[0];
99    match stmt {
100        Statement::Tql(_) => Ok(QueryType::Tql),
101        _ => Ok(QueryType::Sql),
102    }
103}
104
105fn is_merge_mode_last_non_null(options: &HashMap<String, String>) -> bool {
106    options
107        .get(MERGE_MODE_KEY)
108        .map(|mode| mode.eq_ignore_ascii_case("last_non_null"))
109        .unwrap_or(false)
110}
111
112#[derive(Debug, Clone, PartialEq, Eq)]
113pub enum QueryType {
114    /// query is a tql query
115    Tql,
116    /// query is a sql query
117    Sql,
118}
119
120#[derive(Clone)]
121pub struct BatchingTask {
122    pub config: Arc<TaskConfig>,
123    pub state: Arc<RwLock<TaskState>>,
124}
125
126/// Arguments for creating batching task
127pub struct TaskArgs<'a> {
128    pub flow_id: FlowId,
129    pub query: &'a str,
130    pub plan: LogicalPlan,
131    pub time_window_expr: Option<TimeWindowExpr>,
132    pub expire_after: Option<i64>,
133    pub sink_table_name: [String; 3],
134    pub source_table_names: Vec<[String; 3]>,
135    pub query_ctx: QueryContextRef,
136    pub catalog_manager: CatalogManagerRef,
137    pub shutdown_rx: oneshot::Receiver<()>,
138    pub batch_opts: Arc<BatchingModeOptions>,
139    pub flow_eval_interval: Option<Duration>,
140}
141
142pub struct PlanInfo {
143    pub plan: LogicalPlan,
144    pub filter: Option<FilterExprInfo>,
145}
146
147impl BatchingTask {
148    #[allow(clippy::too_many_arguments)]
149    pub fn try_new(
150        TaskArgs {
151            flow_id,
152            query,
153            plan,
154            time_window_expr,
155            expire_after,
156            sink_table_name,
157            source_table_names,
158            query_ctx,
159            catalog_manager,
160            shutdown_rx,
161            batch_opts,
162            flow_eval_interval,
163        }: TaskArgs<'_>,
164    ) -> Result<Self, Error> {
165        Ok(Self {
166            config: Arc::new(TaskConfig {
167                flow_id,
168                query: query.to_string(),
169                time_window_expr,
170                expire_after,
171                sink_table_name,
172                source_table_names: source_table_names.into_iter().collect(),
173                catalog_manager,
174                output_schema: plan.schema().clone(),
175                query_type: determine_query_type(query, &query_ctx)?,
176                batch_opts,
177                flow_eval_interval,
178            }),
179            state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
180        })
181    }
182
183    pub fn last_execution_time_millis(&self) -> Option<i64> {
184        self.state.read().unwrap().last_execution_time_millis()
185    }
186
187    /// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
188    ///
189    /// useful for flush_flow to flush dirty time windows range
190    pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
191        let now = SystemTime::now();
192        let now = Timestamp::new_second(
193            now.duration_since(UNIX_EPOCH)
194                .expect("Time went backwards")
195                .as_secs() as _,
196        );
197        let lower_bound = self
198            .config
199            .expire_after
200            .map(|e| now.sub_duration(Duration::from_secs(e as _)))
201            .transpose()
202            .map_err(BoxedError::new)
203            .context(ExternalSnafu)?
204            .unwrap_or(Timestamp::new_second(0));
205        debug!(
206            "Flow {} mark range ({:?}, {:?}) as dirty",
207            self.config.flow_id, lower_bound, now
208        );
209        self.state
210            .write()
211            .unwrap()
212            .dirty_time_windows
213            .add_window(lower_bound, Some(now));
214        Ok(())
215    }
216
217    /// Create sink table if not exists
218    pub async fn check_or_create_sink_table(
219        &self,
220        engine: &QueryEngineRef,
221        frontend_client: &Arc<FrontendClient>,
222    ) -> Result<Option<(u32, Duration)>, Error> {
223        if !self.is_table_exist(&self.config.sink_table_name).await? {
224            let create_table = self.gen_create_table_expr(engine.clone()).await?;
225            info!(
226                "Try creating sink table(if not exists) with expr: {:?}",
227                create_table
228            );
229            self.create_table(frontend_client, create_table).await?;
230            info!(
231                "Sink table {}(if not exists) created",
232                self.config.sink_table_name.join(".")
233            );
234        }
235
236        Ok(None)
237    }
238
239    async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
240        self.config
241            .catalog_manager
242            .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
243            .await
244            .map_err(BoxedError::new)
245            .context(ExternalSnafu)
246    }
247
248    pub async fn gen_exec_once(
249        &self,
250        engine: &QueryEngineRef,
251        frontend_client: &Arc<FrontendClient>,
252        max_window_cnt: Option<usize>,
253    ) -> Result<Option<(u32, Duration)>, Error> {
254        if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? {
255            debug!("Generate new query: {}", new_query.plan);
256            self.execute_logical_plan(frontend_client, &new_query.plan)
257                .await
258        } else {
259            debug!("Generate no query");
260            Ok(None)
261        }
262    }
263
264    pub async fn gen_insert_plan(
265        &self,
266        engine: &QueryEngineRef,
267        max_window_cnt: Option<usize>,
268    ) -> Result<Option<PlanInfo>, Error> {
269        let (table, df_schema) = get_table_info_df_schema(
270            self.config.catalog_manager.clone(),
271            self.config.sink_table_name.clone(),
272        )
273        .await?;
274
275        let table_meta = &table.table_info().meta;
276        let merge_mode_last_non_null =
277            is_merge_mode_last_non_null(&table_meta.options.extra_options);
278        let primary_key_indices = table_meta.primary_key_indices.clone();
279
280        let new_query = self
281            .gen_query_with_time_window(
282                engine.clone(),
283                &table.table_info().meta.schema,
284                &primary_key_indices,
285                merge_mode_last_non_null,
286                max_window_cnt,
287            )
288            .await?;
289
290        let insert_into_info = if let Some(new_query) = new_query {
291            // first check if all columns in input query exists in sink table
292            // since insert into ref to names in record batch generate by given query
293            let table_columns = df_schema
294                .columns()
295                .into_iter()
296                .map(|c| c.name)
297                .collect::<BTreeSet<_>>();
298            for column in new_query.plan.schema().columns() {
299                ensure!(
300                    table_columns.contains(column.name()),
301                    InvalidQuerySnafu {
302                        reason: format!(
303                            "Column {} not found in sink table with columns {:?}",
304                            column, table_columns
305                        ),
306                    }
307                );
308            }
309
310            let table_provider = Arc::new(DfTableProviderAdapter::new(table));
311            let table_source = Arc::new(DefaultTableSource::new(table_provider));
312
313            // update_at& time index placeholder (if exists) should have default value
314            let plan = LogicalPlan::Dml(DmlStatement::new(
315                datafusion_common::TableReference::Full {
316                    catalog: self.config.sink_table_name[0].clone().into(),
317                    schema: self.config.sink_table_name[1].clone().into(),
318                    table: self.config.sink_table_name[2].clone().into(),
319                },
320                table_source,
321                WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
322                Arc::new(new_query.plan),
323            ));
324            PlanInfo {
325                plan,
326                filter: new_query.filter,
327            }
328        } else {
329            return Ok(None);
330        };
331        let insert_into = insert_into_info
332            .plan
333            .recompute_schema()
334            .context(DatafusionSnafu {
335                context: "Failed to recompute schema",
336            })?;
337
338        Ok(Some(PlanInfo {
339            plan: insert_into,
340            filter: insert_into_info.filter,
341        }))
342    }
343
344    pub async fn create_table(
345        &self,
346        frontend_client: &Arc<FrontendClient>,
347        expr: CreateTableExpr,
348    ) -> Result<(), Error> {
349        let catalog = &self.config.sink_table_name[0];
350        let schema = &self.config.sink_table_name[1];
351        frontend_client
352            .create(expr.clone(), catalog, schema)
353            .await?;
354        Ok(())
355    }
356
357    pub async fn execute_logical_plan(
358        &self,
359        frontend_client: &Arc<FrontendClient>,
360        plan: &LogicalPlan,
361    ) -> Result<Option<(u32, Duration)>, Error> {
362        let instant = Instant::now();
363        let flow_id = self.config.flow_id;
364
365        debug!(
366            "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
367            self.config.expire_after, &plan
368        );
369
370        let catalog = &self.config.sink_table_name[0];
371        let schema = &self.config.sink_table_name[1];
372
373        // fix all table ref by make it fully qualified, i.e. "table_name" => "catalog_name.schema_name.table_name"
374        let plan = plan
375            .clone()
376            .transform_down_with_subqueries(|p| {
377                if let LogicalPlan::TableScan(mut table_scan) = p {
378                    let resolved = table_scan.table_name.resolve(catalog, schema);
379                    table_scan.table_name = resolved.into();
380                    Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
381                } else {
382                    Ok(Transformed::no(p))
383                }
384            })
385            .with_context(|_| DatafusionSnafu {
386                context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
387            })?
388            .data;
389
390        let mut peer_desc = None;
391
392        let res = {
393            let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
394                .with_label_values(&[flow_id.to_string().as_str()])
395                .start_timer();
396
397            // hack and special handling the insert logical plan
398            let req = if let Some((insert_to, insert_plan)) =
399                breakup_insert_plan(&plan, catalog, schema)
400            {
401                let message = DFLogicalSubstraitConvertor {}
402                    .encode(&insert_plan, DefaultSerializer)
403                    .context(SubstraitEncodeLogicalPlanSnafu)?;
404                api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
405                    query: Some(api::v1::query_request::Query::InsertIntoPlan(
406                        api::v1::InsertIntoPlan {
407                            table_name: Some(insert_to),
408                            logical_plan: message.to_vec(),
409                        },
410                    )),
411                })
412            } else {
413                let message = DFLogicalSubstraitConvertor {}
414                    .encode(&plan, DefaultSerializer)
415                    .context(SubstraitEncodeLogicalPlanSnafu)?;
416
417                api::v1::greptime_request::Request::Query(api::v1::QueryRequest {
418                    query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
419                })
420            };
421
422            frontend_client
423                .handle(req, catalog, schema, &mut peer_desc)
424                .await
425        };
426
427        let elapsed = instant.elapsed();
428        if let Ok(affected_rows) = &res {
429            debug!(
430                "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
431                elapsed
432            );
433            METRIC_FLOW_ROWS
434                .with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
435                .inc_by(*affected_rows as _);
436        } else if let Err(err) = &res {
437            warn!(
438                "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
439                peer_desc, elapsed, &plan
440            );
441        }
442
443        // record slow query
444        if elapsed >= self.config.batch_opts.slow_query_threshold {
445            warn!(
446                "Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}",
447                peer_desc, elapsed, &plan
448            );
449            METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
450                .with_label_values(&[
451                    flow_id.to_string().as_str(),
452                    &peer_desc.unwrap_or_default().to_string(),
453                ])
454                .observe(elapsed.as_secs_f64());
455        }
456
457        self.state
458            .write()
459            .unwrap()
460            .after_query_exec(elapsed, res.is_ok());
461
462        let res = res?;
463
464        Ok(Some((res, elapsed)))
465    }
466
467    /// start executing query in a loop, break when receive shutdown signal
468    ///
469    /// any error will be logged when executing query
470    pub async fn start_executing_loop(
471        &self,
472        engine: QueryEngineRef,
473        frontend_client: Arc<FrontendClient>,
474    ) {
475        let flow_id_str = self.config.flow_id.to_string();
476        let mut max_window_cnt = None;
477        let mut interval = self
478            .config
479            .flow_eval_interval
480            .map(|d| tokio::time::interval(d));
481        if let Some(tick) = &mut interval {
482            tick.tick().await; // pass the first tick immediately
483        }
484        loop {
485            // first check if shutdown signal is received
486            // if so, break the loop
487            {
488                let mut state = self.state.write().unwrap();
489                match state.shutdown_rx.try_recv() {
490                    Ok(()) => break,
491                    Err(TryRecvError::Closed) => {
492                        warn!(
493                            "Unexpected shutdown flow {}, shutdown anyway",
494                            self.config.flow_id
495                        );
496                        break;
497                    }
498                    Err(TryRecvError::Empty) => (),
499                }
500            }
501            METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
502                .with_label_values(&[&flow_id_str])
503                .inc();
504
505            let min_refresh = self.config.batch_opts.experimental_min_refresh_duration;
506
507            let new_query = match self.gen_insert_plan(&engine, max_window_cnt).await {
508                Ok(new_query) => new_query,
509                Err(err) => {
510                    common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
511                    // also sleep for a little while before try again to prevent flooding logs
512                    tokio::time::sleep(min_refresh).await;
513                    continue;
514                }
515            };
516
517            let res = if let Some(new_query) = &new_query {
518                self.execute_logical_plan(&frontend_client, &new_query.plan)
519                    .await
520            } else {
521                Ok(None)
522            };
523
524            match res {
525                // normal execute, sleep for some time before doing next query
526                Ok(Some(_)) => {
527                    // can increase max_window_cnt to query more windows next time
528                    max_window_cnt = max_window_cnt.map(|cnt| {
529                        (cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query)
530                    });
531
532                    // here use proper ticking if set eval interval
533                    if let Some(eval_interval) = &mut interval {
534                        eval_interval.tick().await;
535                    } else {
536                        // if not explicitly set, just automatically calculate next start time
537                        // using time window size and more args
538                        let sleep_until = {
539                            let state = self.state.write().unwrap();
540
541                            let time_window_size = self
542                                .config
543                                .time_window_expr
544                                .as_ref()
545                                .and_then(|t| *t.time_window_size());
546
547                            state.get_next_start_query_time(
548                                self.config.flow_id,
549                                &time_window_size,
550                                min_refresh,
551                                Some(self.config.batch_opts.query_timeout),
552                                self.config.batch_opts.experimental_max_filter_num_per_query,
553                            )
554                        };
555
556                        tokio::time::sleep_until(sleep_until).await;
557                    };
558                }
559                // no new data, sleep for some time before checking for new data
560                Ok(None) => {
561                    debug!(
562                        "Flow id = {:?} found no new data, sleep for {:?} then continue",
563                        self.config.flow_id, min_refresh
564                    );
565                    tokio::time::sleep(min_refresh).await;
566                    continue;
567                }
568                // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
569                Err(err) => {
570                    METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
571                        .with_label_values(&[&flow_id_str])
572                        .inc();
573                    match new_query {
574                        Some(query) => {
575                            common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
576                            // Re-add dirty windows back since query failed
577                            self.state.write().unwrap().dirty_time_windows.add_windows(
578                                query.filter.map(|f| f.time_ranges).unwrap_or_default(),
579                            );
580                            // TODO(discord9): add some backoff here? half the query time window or what
581                            // backoff meaning use smaller `max_window_cnt` for next query
582
583                            // since last query failed, we should not try to query too many windows
584                            max_window_cnt = Some(1);
585                        }
586                        None => {
587                            common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
588                        }
589                    }
590                    // also sleep for a little while before try again to prevent flooding logs
591                    tokio::time::sleep(min_refresh).await;
592                }
593            }
594        }
595    }
596
597    /// Generate the create table SQL
598    ///
599    /// the auto created table will automatically added a `update_at` Milliseconds DEFAULT now() column in the end
600    /// (for compatibility with flow streaming mode)
601    ///
602    /// and it will use first timestamp column as time index, all other columns will be added as normal columns and nullable
603    async fn gen_create_table_expr(
604        &self,
605        engine: QueryEngineRef,
606    ) -> Result<CreateTableExpr, Error> {
607        let query_ctx = self.state.read().unwrap().query_ctx.clone();
608        let plan =
609            sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
610        create_table_with_expr(&plan, &self.config.sink_table_name, &self.config.query_type)
611    }
612
613    /// will merge and use the first ten time window in query
614    async fn gen_query_with_time_window(
615        &self,
616        engine: QueryEngineRef,
617        sink_table_schema: &Arc<Schema>,
618        primary_key_indices: &[usize],
619        allow_partial: bool,
620        max_window_cnt: Option<usize>,
621    ) -> Result<Option<PlanInfo>, Error> {
622        let query_ctx = self.state.read().unwrap().query_ctx.clone();
623        let start = SystemTime::now();
624        let since_the_epoch = start
625            .duration_since(UNIX_EPOCH)
626            .expect("Time went backwards");
627        let low_bound = self
628            .config
629            .expire_after
630            .map(|e| since_the_epoch.as_secs() - e as u64)
631            .unwrap_or(u64::MIN);
632
633        let low_bound = Timestamp::new_second(low_bound as i64);
634
635        let expire_time_window_bound = self
636            .config
637            .time_window_expr
638            .as_ref()
639            .map(|expr| expr.eval(low_bound))
640            .transpose()?;
641
642        let (expire_lower_bound, expire_upper_bound) =
643            match (expire_time_window_bound, &self.config.query_type) {
644                (Some((Some(l), Some(u))), QueryType::Sql) => (l, u),
645                (None, QueryType::Sql) => {
646                    // if it's sql query and no time window lower/upper bound is found, just return the original query(with auto columns)
647                    // use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
648                    debug!(
649                        "Flow id = {:?}, no time window, using the same query",
650                        self.config.flow_id
651                    );
652                    // clean dirty time window too, this could be from create flow's check_execute
653                    let is_dirty = !self.state.read().unwrap().dirty_time_windows.is_empty();
654                    self.state.write().unwrap().dirty_time_windows.clean();
655
656                    if !is_dirty {
657                        // no dirty data, hence no need to update
658                        debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
659                        return Ok(None);
660                    }
661
662                    let plan = gen_plan_with_matching_schema(
663                        &self.config.query,
664                        query_ctx,
665                        engine,
666                        sink_table_schema.clone(),
667                        primary_key_indices,
668                        allow_partial,
669                    )
670                    .await?;
671
672                    return Ok(Some(PlanInfo { plan, filter: None }));
673                }
674                _ => {
675                    // clean for tql have no use for time window
676                    self.state.write().unwrap().dirty_time_windows.clean();
677
678                    let plan = gen_plan_with_matching_schema(
679                        &self.config.query,
680                        query_ctx,
681                        engine,
682                        sink_table_schema.clone(),
683                        primary_key_indices,
684                        allow_partial,
685                    )
686                    .await?;
687
688                    return Ok(Some(PlanInfo { plan, filter: None }));
689                }
690            };
691
692        debug!(
693            "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
694            self.config.flow_id,
695            expire_lower_bound,
696            expire_upper_bound,
697            self.state.read().unwrap().dirty_time_windows
698        );
699        let window_size = expire_upper_bound
700            .sub(&expire_lower_bound)
701            .with_context(|| UnexpectedSnafu {
702                reason: format!(
703                    "Can't get window size from {expire_upper_bound:?} - {expire_lower_bound:?}"
704                ),
705            })?;
706        let col_name = self
707            .config
708            .time_window_expr
709            .as_ref()
710            .map(|expr| expr.column_name.clone())
711            .with_context(|| UnexpectedSnafu {
712                reason: format!(
713                    "Flow id={:?}, Failed to get column name from time window expr",
714                    self.config.flow_id
715                ),
716            })?;
717
718        let expr = self
719            .state
720            .write()
721            .unwrap()
722            .dirty_time_windows
723            .gen_filter_exprs(
724                &col_name,
725                Some(expire_lower_bound),
726                window_size,
727                max_window_cnt
728                    .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query),
729                self.config.flow_id,
730                Some(self),
731            )?;
732
733        debug!(
734            "Flow id={:?}, Generated filter expr: {:?}",
735            self.config.flow_id,
736            expr.as_ref()
737                .map(
738                    |expr| expr_to_sql(&expr.expr).with_context(|_| DatafusionSnafu {
739                        context: format!("Failed to generate filter expr from {expr:?}"),
740                    })
741                )
742                .transpose()?
743                .map(|s| s.to_string())
744        );
745
746        let Some(expr) = expr else {
747            // no new data, hence no need to update
748            debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
749            return Ok(None);
750        };
751
752        let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
753        let mut add_auto_column = ColumnMatcherRewriter::new(
754            sink_table_schema.clone(),
755            primary_key_indices.to_vec(),
756            allow_partial,
757        );
758
759        let plan =
760            sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;
761        let rewrite = plan
762            .clone()
763            .rewrite(&mut add_filter)
764            .and_then(|p| p.data.rewrite(&mut add_auto_column))
765            .with_context(|_| DatafusionSnafu {
766                context: format!("Failed to rewrite plan:\n {}\n", plan),
767            })?
768            .data;
769        // only apply optimize after complex rewrite is done
770        let new_plan = apply_df_optimizer(rewrite, &query_ctx).await?;
771
772        let info = PlanInfo {
773            plan: new_plan.clone(),
774            filter: Some(expr),
775        };
776
777        Ok(Some(info))
778    }
779}
780
781// auto created table have a auto added column `update_at`, and optional have a `AUTO_CREATED_PLACEHOLDER_TS_COL` column for time index placeholder if no timestamp column is specified
782// TODO(discord9): for now no default value is set for auto added column for compatibility reason with streaming mode, but this might change in favor of simpler code?
783fn create_table_with_expr(
784    plan: &LogicalPlan,
785    sink_table_name: &[String; 3],
786    query_type: &QueryType,
787) -> Result<CreateTableExpr, Error> {
788    let table_def = match query_type {
789        &QueryType::Sql => {
790            if let Some(def) = build_pk_from_aggr(plan)? {
791                def
792            } else {
793                build_by_sql_schema(plan)?
794            }
795        }
796        QueryType::Tql => {
797            // first try build from aggr, then from tql schema because tql query might not have aggr node
798            if let Some(table_def) = build_pk_from_aggr(plan)? {
799                table_def
800            } else {
801                build_by_tql_schema(plan)?
802            }
803        }
804    };
805    let first_time_stamp = table_def.ts_col;
806    let primary_keys = table_def.pks;
807
808    let mut column_schemas = Vec::new();
809    for field in plan.schema().fields() {
810        let name = field.name();
811        let ty = ConcreteDataType::from_arrow_type(field.data_type());
812        let col_schema = if first_time_stamp == Some(name.clone()) {
813            ColumnSchema::new(name, ty, false).with_time_index(true)
814        } else {
815            ColumnSchema::new(name, ty, true)
816        };
817
818        match query_type {
819            QueryType::Sql => {
820                column_schemas.push(col_schema);
821            }
822            QueryType::Tql => {
823                // if is val column, need to rename as val DOUBLE NULL
824                // if is tag column, need to cast type as STRING NULL
825                let is_tag_column = primary_keys.contains(name);
826                let is_val_column = !is_tag_column && first_time_stamp.as_ref() != Some(name);
827                if is_val_column {
828                    let col_schema =
829                        ColumnSchema::new(name, ConcreteDataType::float64_datatype(), true);
830                    column_schemas.push(col_schema);
831                } else if is_tag_column {
832                    let col_schema =
833                        ColumnSchema::new(name, ConcreteDataType::string_datatype(), true);
834                    column_schemas.push(col_schema);
835                } else {
836                    // time index column
837                    column_schemas.push(col_schema);
838                }
839            }
840        }
841    }
842
843    if query_type == &QueryType::Sql {
844        let update_at_schema = ColumnSchema::new(
845            AUTO_CREATED_UPDATE_AT_TS_COL,
846            ConcreteDataType::timestamp_millisecond_datatype(),
847            true,
848        );
849        column_schemas.push(update_at_schema);
850    }
851
852    let time_index = if let Some(time_index) = first_time_stamp {
853        time_index
854    } else {
855        column_schemas.push(
856            ColumnSchema::new(
857                AUTO_CREATED_PLACEHOLDER_TS_COL,
858                ConcreteDataType::timestamp_millisecond_datatype(),
859                false,
860            )
861            .with_time_index(true),
862        );
863        AUTO_CREATED_PLACEHOLDER_TS_COL.to_string()
864    };
865
866    let column_defs =
867        column_schemas_to_defs(column_schemas, &primary_keys).context(ConvertColumnSchemaSnafu)?;
868    Ok(CreateTableExpr {
869        catalog_name: sink_table_name[0].clone(),
870        schema_name: sink_table_name[1].clone(),
871        table_name: sink_table_name[2].clone(),
872        desc: "Auto created table by flow engine".to_string(),
873        column_defs,
874        time_index,
875        primary_keys,
876        create_if_not_exists: true,
877        table_options: Default::default(),
878        table_id: None,
879        engine: "mito".to_string(),
880    })
881}
882
883/// simply build by schema, return first timestamp column and no primary key
884fn build_by_sql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
885    let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
886        if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
887            Some(f.name().clone())
888        } else {
889            None
890        }
891    });
892    Ok(TableDef {
893        ts_col: first_time_stamp,
894        pks: vec![],
895    })
896}
897
898/// Return first timestamp column found in output schema and all string columns
899fn build_by_tql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
900    let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
901        if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
902            Some(f.name().clone())
903        } else {
904            None
905        }
906    });
907    let string_columns = plan
908        .schema()
909        .fields()
910        .iter()
911        .filter_map(|f| {
912            if ConcreteDataType::from_arrow_type(f.data_type()).is_string() {
913                Some(f.name().clone())
914            } else {
915                None
916            }
917        })
918        .collect::<Vec<_>>();
919
920    Ok(TableDef {
921        ts_col: first_time_stamp,
922        pks: string_columns,
923    })
924}
925
926struct TableDef {
927    ts_col: Option<String>,
928    pks: Vec<String>,
929}
930
931/// Return first timestamp column which is in group by clause and other columns which are also in group by clause
932///
933/// # Returns
934///
935/// * `Option<String>` - first timestamp column which is in group by clause
936/// * `Vec<String>` - other columns which are also in group by clause
937///
938/// if no aggregation found, return None
939fn build_pk_from_aggr(plan: &LogicalPlan) -> Result<Option<TableDef>, Error> {
940    let fields = plan.schema().fields();
941    let mut pk_names = FindGroupByFinalName::default();
942
943    plan.visit(&mut pk_names)
944        .with_context(|_| DatafusionSnafu {
945            context: format!("Can't find aggr expr in plan {plan:?}"),
946        })?;
947
948    // if no group by clause, return empty with first timestamp column found in output schema
949    let Some(pk_final_names) = pk_names.get_group_expr_names() else {
950        return Ok(None);
951    };
952    if pk_final_names.is_empty() {
953        let first_ts_col = fields
954            .iter()
955            .find(|f| ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp())
956            .map(|f| f.name().clone());
957        return Ok(Some(TableDef {
958            ts_col: first_ts_col,
959            pks: vec![],
960        }));
961    }
962
963    let all_pk_cols: Vec<_> = fields
964        .iter()
965        .filter(|f| pk_final_names.contains(f.name()))
966        .map(|f| f.name().clone())
967        .collect();
968    // auto create table use first timestamp column in group by clause as time index
969    let first_time_stamp = fields
970        .iter()
971        .find(|f| {
972            all_pk_cols.contains(&f.name().clone())
973                && ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()
974        })
975        .map(|f| f.name().clone());
976
977    let all_pk_cols: Vec<_> = all_pk_cols
978        .into_iter()
979        .filter(|col| first_time_stamp.as_ref() != Some(col))
980        .collect();
981
982    Ok(Some(TableDef {
983        ts_col: first_time_stamp,
984        pks: all_pk_cols,
985    }))
986}
987
988#[cfg(test)]
989mod test {
990    use api::v1::column_def::try_as_column_schema;
991    use pretty_assertions::assert_eq;
992    use session::context::QueryContext;
993
994    use super::*;
995    use crate::test_utils::create_test_query_engine;
996
997    #[tokio::test]
998    async fn test_gen_create_table_sql() {
999        let query_engine = create_test_query_engine();
1000        let ctx = QueryContext::arc();
1001        struct TestCase {
1002            sql: String,
1003            sink_table_name: String,
1004            column_schemas: Vec<ColumnSchema>,
1005            primary_keys: Vec<String>,
1006            time_index: String,
1007        }
1008
1009        let update_at_schema = ColumnSchema::new(
1010            AUTO_CREATED_UPDATE_AT_TS_COL,
1011            ConcreteDataType::timestamp_millisecond_datatype(),
1012            true,
1013        );
1014
1015        let ts_placeholder_schema = ColumnSchema::new(
1016            AUTO_CREATED_PLACEHOLDER_TS_COL,
1017            ConcreteDataType::timestamp_millisecond_datatype(),
1018            false,
1019        )
1020        .with_time_index(true);
1021
1022        let testcases = vec![
1023            TestCase {
1024                sql: "SELECT number, ts FROM numbers_with_ts".to_string(),
1025                sink_table_name: "new_table".to_string(),
1026                column_schemas: vec![
1027                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1028                    ColumnSchema::new(
1029                        "ts",
1030                        ConcreteDataType::timestamp_millisecond_datatype(),
1031                        false,
1032                    )
1033                    .with_time_index(true),
1034                    update_at_schema.clone(),
1035                ],
1036                primary_keys: vec![],
1037                time_index: "ts".to_string(),
1038            },
1039            TestCase {
1040                sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(),
1041                sink_table_name: "new_table".to_string(),
1042                column_schemas: vec![
1043                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1044                    ColumnSchema::new(
1045                        "max(numbers_with_ts.ts)",
1046                        ConcreteDataType::timestamp_millisecond_datatype(),
1047                        true,
1048                    ),
1049                    update_at_schema.clone(),
1050                    ts_placeholder_schema.clone(),
1051                ],
1052                primary_keys: vec!["number".to_string()],
1053                time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
1054            },
1055            TestCase {
1056                sql: "SELECT max(number), ts FROM numbers_with_ts GROUP BY ts".to_string(),
1057                sink_table_name: "new_table".to_string(),
1058                column_schemas: vec![
1059                    ColumnSchema::new(
1060                        "max(numbers_with_ts.number)",
1061                        ConcreteDataType::uint32_datatype(),
1062                        true,
1063                    ),
1064                    ColumnSchema::new(
1065                        "ts",
1066                        ConcreteDataType::timestamp_millisecond_datatype(),
1067                        false,
1068                    )
1069                    .with_time_index(true),
1070                    update_at_schema.clone(),
1071                ],
1072                primary_keys: vec![],
1073                time_index: "ts".to_string(),
1074            },
1075            TestCase {
1076                sql: "SELECT number, ts FROM numbers_with_ts GROUP BY ts, number".to_string(),
1077                sink_table_name: "new_table".to_string(),
1078                column_schemas: vec![
1079                    ColumnSchema::new("number", ConcreteDataType::uint32_datatype(), true),
1080                    ColumnSchema::new(
1081                        "ts",
1082                        ConcreteDataType::timestamp_millisecond_datatype(),
1083                        false,
1084                    )
1085                    .with_time_index(true),
1086                    update_at_schema.clone(),
1087                ],
1088                primary_keys: vec!["number".to_string()],
1089                time_index: "ts".to_string(),
1090            },
1091        ];
1092
1093        for tc in testcases {
1094            let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &tc.sql, true)
1095                .await
1096                .unwrap();
1097            let expr = create_table_with_expr(
1098                &plan,
1099                &[
1100                    "greptime".to_string(),
1101                    "public".to_string(),
1102                    tc.sink_table_name.clone(),
1103                ],
1104                &QueryType::Sql,
1105            )
1106            .unwrap();
1107            // TODO(discord9): assert expr
1108            let column_schemas = expr
1109                .column_defs
1110                .iter()
1111                .map(|c| try_as_column_schema(c).unwrap())
1112                .collect::<Vec<_>>();
1113            assert_eq!(tc.column_schemas, column_schemas, "{:?}", tc.sql);
1114            assert_eq!(tc.primary_keys, expr.primary_keys, "{:?}", tc.sql);
1115            assert_eq!(tc.time_index, expr.time_index, "{:?}", tc.sql);
1116        }
1117    }
1118}