query/
datafusion.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Planner, QueryEngine implementations based on DataFusion.
16
17mod error;
18mod planner;
19
20use std::any::Any;
21use std::collections::HashMap;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use common_base::Plugins;
26use common_catalog::consts::is_readonly_schema;
27use common_error::ext::BoxedError;
28use common_function::function_factory::ScalarFunctionFactory;
29use common_query::{Output, OutputData, OutputMeta};
30use common_recordbatch::adapter::RecordBatchStreamAdapter;
31use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
32use common_telemetry::tracing;
33use datafusion::catalog::TableFunction;
34use datafusion::physical_plan::ExecutionPlan;
35use datafusion::physical_plan::analyze::AnalyzeExec;
36use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
37use datafusion_common::ResolvedTableReference;
38use datafusion_expr::{
39    AggregateUDF, DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlan, WriteOp,
40};
41use datatypes::prelude::VectorRef;
42use datatypes::schema::Schema;
43use futures_util::StreamExt;
44use session::context::QueryContextRef;
45use snafu::{OptionExt, ResultExt, ensure};
46use sqlparser::ast::AnalyzeFormat;
47use table::TableRef;
48use table::requests::{DeleteRequest, InsertRequest};
49
50use crate::analyze::DistAnalyzeExec;
51use crate::dataframe::DataFrame;
52pub use crate::datafusion::planner::DfContextProviderAdapter;
53use crate::dist_plan::{DistPlannerOptions, MergeScanLogicalPlan};
54use crate::error::{
55    CatalogSnafu, ConvertSchemaSnafu, CreateRecordBatchSnafu, MissingTableMutationHandlerSnafu,
56    MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, TableMutationSnafu,
57    TableNotFoundSnafu, TableReadOnlySnafu, UnsupportedExprSnafu,
58};
59use crate::executor::QueryExecutor;
60use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED};
61use crate::physical_wrapper::PhysicalPlanWrapperRef;
62use crate::planner::{DfLogicalPlanner, LogicalPlanner};
63use crate::query_engine::{DescribeResult, QueryEngineContext, QueryEngineState};
64use crate::{QueryEngine, metrics};
65
66/// Query parallelism hint key.
67/// This hint can be set in the query context to control the parallelism of the query execution.
68pub const QUERY_PARALLELISM_HINT: &str = "query_parallelism";
69
70/// Whether to fallback to the original plan when failed to push down.
71pub const QUERY_FALLBACK_HINT: &str = "query_fallback";
72
73pub struct DatafusionQueryEngine {
74    state: Arc<QueryEngineState>,
75    plugins: Plugins,
76}
77
78impl DatafusionQueryEngine {
79    pub fn new(state: Arc<QueryEngineState>, plugins: Plugins) -> Self {
80        Self { state, plugins }
81    }
82
83    #[tracing::instrument(skip_all)]
84    async fn exec_query_plan(
85        &self,
86        plan: LogicalPlan,
87        query_ctx: QueryContextRef,
88    ) -> Result<Output> {
89        let mut ctx = self.engine_context(query_ctx.clone());
90
91        // `create_physical_plan` will optimize logical plan internally
92        let physical_plan = self.create_physical_plan(&mut ctx, &plan).await?;
93        let optimized_physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?;
94
95        let physical_plan = if let Some(wrapper) = self.plugins.get::<PhysicalPlanWrapperRef>() {
96            wrapper.wrap(optimized_physical_plan, query_ctx)
97        } else {
98            optimized_physical_plan
99        };
100
101        Ok(Output::new(
102            OutputData::Stream(self.execute_stream(&ctx, &physical_plan)?),
103            OutputMeta::new_with_plan(physical_plan),
104        ))
105    }
106
107    #[tracing::instrument(skip_all)]
108    async fn exec_dml_statement(
109        &self,
110        dml: DmlStatement,
111        query_ctx: QueryContextRef,
112    ) -> Result<Output> {
113        ensure!(
114            matches!(dml.op, WriteOp::Insert(_) | WriteOp::Delete),
115            UnsupportedExprSnafu {
116                name: format!("DML op {}", dml.op),
117            }
118        );
119
120        let _timer = QUERY_STAGE_ELAPSED
121            .with_label_values(&[dml.op.name()])
122            .start_timer();
123
124        let default_catalog = &query_ctx.current_catalog().to_owned();
125        let default_schema = &query_ctx.current_schema();
126        let table_name = dml.table_name.resolve(default_catalog, default_schema);
127        let table = self.find_table(&table_name, &query_ctx).await?;
128
129        let output = self
130            .exec_query_plan((*dml.input).clone(), query_ctx.clone())
131            .await?;
132        let mut stream = match output.data {
133            OutputData::RecordBatches(batches) => batches.as_stream(),
134            OutputData::Stream(stream) => stream,
135            _ => unreachable!(),
136        };
137
138        let mut affected_rows = 0;
139        let mut insert_cost = 0;
140
141        while let Some(batch) = stream.next().await {
142            let batch = batch.context(CreateRecordBatchSnafu)?;
143            let column_vectors = batch
144                .column_vectors(&table_name.to_string(), table.schema())
145                .map_err(BoxedError::new)
146                .context(QueryExecutionSnafu)?;
147
148            match dml.op {
149                WriteOp::Insert(_) => {
150                    // We ignore the insert op.
151                    let output = self
152                        .insert(&table_name, column_vectors, query_ctx.clone())
153                        .await?;
154                    let (rows, cost) = output.extract_rows_and_cost();
155                    affected_rows += rows;
156                    insert_cost += cost;
157                }
158                WriteOp::Delete => {
159                    affected_rows += self
160                        .delete(&table_name, &table, column_vectors, query_ctx.clone())
161                        .await?;
162                }
163                _ => unreachable!("guarded by the 'ensure!' at the beginning"),
164            }
165        }
166        Ok(Output::new(
167            OutputData::AffectedRows(affected_rows),
168            OutputMeta::new_with_cost(insert_cost),
169        ))
170    }
171
172    #[tracing::instrument(skip_all)]
173    async fn delete(
174        &self,
175        table_name: &ResolvedTableReference,
176        table: &TableRef,
177        column_vectors: HashMap<String, VectorRef>,
178        query_ctx: QueryContextRef,
179    ) -> Result<usize> {
180        let catalog_name = table_name.catalog.to_string();
181        let schema_name = table_name.schema.to_string();
182        let table_name = table_name.table.to_string();
183        let table_schema = table.schema();
184
185        ensure!(
186            !is_readonly_schema(&schema_name),
187            TableReadOnlySnafu { table: table_name }
188        );
189
190        let ts_column = table_schema
191            .timestamp_column()
192            .map(|x| &x.name)
193            .with_context(|| MissingTimestampColumnSnafu {
194                table_name: table_name.to_string(),
195            })?;
196
197        let table_info = table.table_info();
198        let rowkey_columns = table_info
199            .meta
200            .row_key_column_names()
201            .collect::<Vec<&String>>();
202        let column_vectors = column_vectors
203            .into_iter()
204            .filter(|x| &x.0 == ts_column || rowkey_columns.contains(&&x.0))
205            .collect::<HashMap<_, _>>();
206
207        let request = DeleteRequest {
208            catalog_name,
209            schema_name,
210            table_name,
211            key_column_values: column_vectors,
212        };
213
214        self.state
215            .table_mutation_handler()
216            .context(MissingTableMutationHandlerSnafu)?
217            .delete(request, query_ctx)
218            .await
219            .context(TableMutationSnafu)
220    }
221
222    #[tracing::instrument(skip_all)]
223    async fn insert(
224        &self,
225        table_name: &ResolvedTableReference,
226        column_vectors: HashMap<String, VectorRef>,
227        query_ctx: QueryContextRef,
228    ) -> Result<Output> {
229        let catalog_name = table_name.catalog.to_string();
230        let schema_name = table_name.schema.to_string();
231        let table_name = table_name.table.to_string();
232
233        ensure!(
234            !is_readonly_schema(&schema_name),
235            TableReadOnlySnafu { table: table_name }
236        );
237
238        let request = InsertRequest {
239            catalog_name,
240            schema_name,
241            table_name,
242            columns_values: column_vectors,
243        };
244
245        self.state
246            .table_mutation_handler()
247            .context(MissingTableMutationHandlerSnafu)?
248            .insert(request, query_ctx)
249            .await
250            .context(TableMutationSnafu)
251    }
252
253    async fn find_table(
254        &self,
255        table_name: &ResolvedTableReference,
256        query_context: &QueryContextRef,
257    ) -> Result<TableRef> {
258        let catalog_name = table_name.catalog.as_ref();
259        let schema_name = table_name.schema.as_ref();
260        let table_name = table_name.table.as_ref();
261
262        self.state
263            .catalog_manager()
264            .table(catalog_name, schema_name, table_name, Some(query_context))
265            .await
266            .context(CatalogSnafu)?
267            .with_context(|| TableNotFoundSnafu { table: table_name })
268    }
269
270    #[tracing::instrument(skip_all)]
271    async fn create_physical_plan(
272        &self,
273        ctx: &mut QueryEngineContext,
274        logical_plan: &LogicalPlan,
275    ) -> Result<Arc<dyn ExecutionPlan>> {
276        /// Only print context on panic, to avoid cluttering logs.
277        ///
278        /// TODO(discord9): remove this once we catch the bug
279        #[derive(Debug)]
280        struct PanicLogger<'a> {
281            input_logical_plan: &'a LogicalPlan,
282            after_analyze: Option<LogicalPlan>,
283            after_optimize: Option<LogicalPlan>,
284            phy_plan: Option<Arc<dyn ExecutionPlan>>,
285        }
286        impl Drop for PanicLogger<'_> {
287            fn drop(&mut self) {
288                if std::thread::panicking() {
289                    common_telemetry::error!(
290                        "Panic while creating physical plan, input logical plan: {:?}, after analyze: {:?}, after optimize: {:?}, final physical plan: {:?}",
291                        self.input_logical_plan,
292                        self.after_analyze,
293                        self.after_optimize,
294                        self.phy_plan
295                    );
296                }
297            }
298        }
299
300        let mut logger = PanicLogger {
301            input_logical_plan: logical_plan,
302            after_analyze: None,
303            after_optimize: None,
304            phy_plan: None,
305        };
306
307        let _timer = metrics::CREATE_PHYSICAL_ELAPSED.start_timer();
308        let state = ctx.state();
309
310        common_telemetry::debug!("Create physical plan, input plan: {logical_plan}");
311
312        // special handle EXPLAIN plan
313        if matches!(logical_plan, DfLogicalPlan::Explain(_)) {
314            return state
315                .create_physical_plan(logical_plan)
316                .await
317                .context(error::DatafusionSnafu)
318                .map_err(BoxedError::new)
319                .context(QueryExecutionSnafu);
320        }
321
322        // analyze first
323        let analyzed_plan = state
324            .analyzer()
325            .execute_and_check(logical_plan.clone(), state.config_options(), |_, _| {})
326            .context(error::DatafusionSnafu)
327            .map_err(BoxedError::new)
328            .context(QueryExecutionSnafu)?;
329
330        logger.after_analyze = Some(analyzed_plan.clone());
331
332        common_telemetry::debug!("Create physical plan, analyzed plan: {analyzed_plan}");
333
334        // skip optimize for MergeScan
335        let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan
336            && ext.node.name() == MergeScanLogicalPlan::name()
337        {
338            analyzed_plan.clone()
339        } else {
340            state
341                .optimizer()
342                .optimize(analyzed_plan, state, |_, _| {})
343                .context(error::DatafusionSnafu)
344                .map_err(BoxedError::new)
345                .context(QueryExecutionSnafu)?
346        };
347
348        common_telemetry::debug!("Create physical plan, optimized plan: {optimized_plan}");
349        logger.after_optimize = Some(optimized_plan.clone());
350
351        let physical_plan = state
352            .query_planner()
353            .create_physical_plan(&optimized_plan, state)
354            .await?;
355
356        logger.phy_plan = Some(physical_plan.clone());
357        drop(logger);
358        Ok(physical_plan)
359    }
360
361    #[tracing::instrument(skip_all)]
362    pub fn optimize(
363        &self,
364        context: &QueryEngineContext,
365        plan: &LogicalPlan,
366    ) -> Result<LogicalPlan> {
367        let _timer = metrics::OPTIMIZE_LOGICAL_ELAPSED.start_timer();
368
369        // Optimized by extension rules
370        let optimized_plan = self
371            .state
372            .optimize_by_extension_rules(plan.clone(), context)
373            .context(error::DatafusionSnafu)
374            .map_err(BoxedError::new)
375            .context(QueryExecutionSnafu)?;
376
377        // Optimized by datafusion optimizer
378        let optimized_plan = self
379            .state
380            .session_state()
381            .optimize(&optimized_plan)
382            .context(error::DatafusionSnafu)
383            .map_err(BoxedError::new)
384            .context(QueryExecutionSnafu)?;
385
386        Ok(optimized_plan)
387    }
388
389    #[tracing::instrument(skip_all)]
390    fn optimize_physical_plan(
391        &self,
392        ctx: &mut QueryEngineContext,
393        plan: Arc<dyn ExecutionPlan>,
394    ) -> Result<Arc<dyn ExecutionPlan>> {
395        let _timer = metrics::OPTIMIZE_PHYSICAL_ELAPSED.start_timer();
396
397        // TODO(ruihang): `self.create_physical_plan()` already optimize the plan, check
398        // if we need to optimize it again here.
399        // let state = ctx.state();
400        // let config = state.config_options();
401
402        // skip optimize AnalyzeExec plan
403        let optimized_plan = if let Some(analyze_plan) = plan.as_any().downcast_ref::<AnalyzeExec>()
404        {
405            let format = if let Some(format) = ctx.query_ctx().explain_format()
406                && format.to_lowercase() == "json"
407            {
408                AnalyzeFormat::JSON
409            } else {
410                AnalyzeFormat::TEXT
411            };
412            // Sets the verbose flag of the query context.
413            // The MergeScanExec plan uses the verbose flag to determine whether to print the plan in verbose mode.
414            ctx.query_ctx().set_explain_verbose(analyze_plan.verbose());
415
416            Arc::new(DistAnalyzeExec::new(
417                analyze_plan.input().clone(),
418                analyze_plan.verbose(),
419                format,
420            ))
421            // let mut new_plan = analyze_plan.input().clone();
422            // for optimizer in state.physical_optimizers() {
423            //     new_plan = optimizer
424            //         .optimize(new_plan, config)
425            //         .context(DataFusionSnafu)?;
426            // }
427            // Arc::new(DistAnalyzeExec::new(new_plan))
428        } else {
429            plan
430            // let mut new_plan = plan;
431            // for optimizer in state.physical_optimizers() {
432            //     new_plan = optimizer
433            //         .optimize(new_plan, config)
434            //         .context(DataFusionSnafu)?;
435            // }
436            // new_plan
437        };
438
439        Ok(optimized_plan)
440    }
441}
442
443#[async_trait]
444impl QueryEngine for DatafusionQueryEngine {
445    fn as_any(&self) -> &dyn Any {
446        self
447    }
448
449    fn planner(&self) -> Arc<dyn LogicalPlanner> {
450        Arc::new(DfLogicalPlanner::new(self.state.clone()))
451    }
452
453    fn name(&self) -> &str {
454        "datafusion"
455    }
456
457    async fn describe(
458        &self,
459        plan: LogicalPlan,
460        query_ctx: QueryContextRef,
461    ) -> Result<DescribeResult> {
462        let ctx = self.engine_context(query_ctx);
463        if let Ok(optimised_plan) = self.optimize(&ctx, &plan) {
464            let schema = optimised_plan
465                .schema()
466                .clone()
467                .try_into()
468                .context(ConvertSchemaSnafu)?;
469            Ok(DescribeResult {
470                schema,
471                logical_plan: optimised_plan,
472            })
473        } else {
474            // Table's like those in information_schema cannot be optimized when
475            // it contains parameters. So we fallback to original plans.
476            let schema = plan
477                .schema()
478                .clone()
479                .try_into()
480                .context(ConvertSchemaSnafu)?;
481            Ok(DescribeResult {
482                schema,
483                logical_plan: plan,
484            })
485        }
486    }
487
488    async fn execute(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
489        match plan {
490            LogicalPlan::Dml(dml) => self.exec_dml_statement(dml, query_ctx).await,
491            _ => self.exec_query_plan(plan, query_ctx).await,
492        }
493    }
494
495    /// Note in SQL queries, aggregate names are looked up using
496    /// lowercase unless the query uses quotes. For example,
497    ///
498    /// `SELECT MY_UDAF(x)...` will look for an aggregate named `"my_udaf"`
499    /// `SELECT "my_UDAF"(x)` will look for an aggregate named `"my_UDAF"`
500    ///
501    /// So it's better to make UDAF name lowercase when creating one.
502    fn register_aggregate_function(&self, func: AggregateUDF) {
503        self.state.register_aggr_function(func);
504    }
505
506    /// Register an scalar function.
507    /// Will override if the function with same name is already registered.
508    fn register_scalar_function(&self, func: ScalarFunctionFactory) {
509        self.state.register_scalar_function(func);
510    }
511
512    fn register_table_function(&self, func: Arc<TableFunction>) {
513        self.state.register_table_function(func);
514    }
515
516    fn read_table(&self, table: TableRef) -> Result<DataFrame> {
517        Ok(DataFrame::DataFusion(
518            self.state
519                .read_table(table)
520                .context(error::DatafusionSnafu)
521                .map_err(BoxedError::new)
522                .context(QueryExecutionSnafu)?,
523        ))
524    }
525
526    fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext {
527        let mut state = self.state.session_state();
528        state.config_mut().set_extension(query_ctx.clone());
529        // note that hints in "x-greptime-hints" is automatically parsed
530        // and set to query context's extension, so we can get it from query context.
531        if let Some(parallelism) = query_ctx.extension(QUERY_PARALLELISM_HINT) {
532            if let Ok(n) = parallelism.parse::<u64>() {
533                if n > 0 {
534                    let new_cfg = state.config().clone().with_target_partitions(n as usize);
535                    *state.config_mut() = new_cfg;
536                }
537            } else {
538                common_telemetry::warn!(
539                    "Failed to parse query_parallelism: {}, using default value",
540                    parallelism
541                );
542            }
543        }
544
545        // configure execution options
546        state.config_mut().options_mut().execution.time_zone = query_ctx.timezone().to_string();
547
548        // usually it's impossible to have both `set variable` set by sql client and
549        // hint in header by grpc client, so only need to deal with them separately
550        if query_ctx.configuration_parameter().allow_query_fallback() {
551            state
552                .config_mut()
553                .options_mut()
554                .extensions
555                .insert(DistPlannerOptions {
556                    allow_query_fallback: true,
557                });
558        } else if let Some(fallback) = query_ctx.extension(QUERY_FALLBACK_HINT) {
559            // also check the query context for fallback hint
560            // if it is set, we will enable the fallback
561            if fallback.to_lowercase().parse::<bool>().unwrap_or(false) {
562                state
563                    .config_mut()
564                    .options_mut()
565                    .extensions
566                    .insert(DistPlannerOptions {
567                        allow_query_fallback: true,
568                    });
569            }
570        }
571        QueryEngineContext::new(state, query_ctx)
572    }
573
574    fn engine_state(&self) -> &QueryEngineState {
575        &self.state
576    }
577}
578
579impl QueryExecutor for DatafusionQueryEngine {
580    #[tracing::instrument(skip_all)]
581    fn execute_stream(
582        &self,
583        ctx: &QueryEngineContext,
584        plan: &Arc<dyn ExecutionPlan>,
585    ) -> Result<SendableRecordBatchStream> {
586        let explain_verbose = ctx.query_ctx().explain_verbose();
587        let output_partitions = plan.properties().output_partitioning().partition_count();
588        if explain_verbose {
589            common_telemetry::info!("Executing query plan, output_partitions: {output_partitions}");
590        }
591
592        let exec_timer = metrics::EXEC_PLAN_ELAPSED.start_timer();
593        let task_ctx = ctx.build_task_ctx();
594
595        match plan.properties().output_partitioning().partition_count() {
596            0 => {
597                let schema = Arc::new(
598                    Schema::try_from(plan.schema())
599                        .map_err(BoxedError::new)
600                        .context(QueryExecutionSnafu)?,
601                );
602                Ok(Box::pin(EmptyRecordBatchStream::new(schema)))
603            }
604            1 => {
605                let df_stream = plan
606                    .execute(0, task_ctx)
607                    .context(error::DatafusionSnafu)
608                    .map_err(BoxedError::new)
609                    .context(QueryExecutionSnafu)?;
610                let mut stream = RecordBatchStreamAdapter::try_new(df_stream)
611                    .context(error::ConvertDfRecordBatchStreamSnafu)
612                    .map_err(BoxedError::new)
613                    .context(QueryExecutionSnafu)?;
614                stream.set_metrics2(plan.clone());
615                stream.set_explain_verbose(explain_verbose);
616                let stream = OnDone::new(Box::pin(stream), move || {
617                    let exec_cost = exec_timer.stop_and_record();
618                    if explain_verbose {
619                        common_telemetry::info!(
620                            "DatafusionQueryEngine execute 1 stream, cost: {:?}s",
621                            exec_cost,
622                        );
623                    }
624                });
625                Ok(Box::pin(stream))
626            }
627            _ => {
628                // merge into a single partition
629                let merged_plan = CoalescePartitionsExec::new(plan.clone());
630                // CoalescePartitionsExec must produce a single partition
631                assert_eq!(
632                    1,
633                    merged_plan
634                        .properties()
635                        .output_partitioning()
636                        .partition_count()
637                );
638                let df_stream = merged_plan
639                    .execute(0, task_ctx)
640                    .context(error::DatafusionSnafu)
641                    .map_err(BoxedError::new)
642                    .context(QueryExecutionSnafu)?;
643                let mut stream = RecordBatchStreamAdapter::try_new(df_stream)
644                    .context(error::ConvertDfRecordBatchStreamSnafu)
645                    .map_err(BoxedError::new)
646                    .context(QueryExecutionSnafu)?;
647                stream.set_metrics2(plan.clone());
648                stream.set_explain_verbose(ctx.query_ctx().explain_verbose());
649                let stream = OnDone::new(Box::pin(stream), move || {
650                    let exec_cost = exec_timer.stop_and_record();
651                    if explain_verbose {
652                        common_telemetry::info!(
653                            "DatafusionQueryEngine execute {output_partitions} stream, cost: {:?}s",
654                            exec_cost
655                        );
656                    }
657                });
658                Ok(Box::pin(stream))
659            }
660        }
661    }
662}
663
664#[cfg(test)]
665mod tests {
666    use std::sync::Arc;
667
668    use catalog::RegisterTableRequest;
669    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID};
670    use common_recordbatch::util;
671    use datafusion::prelude::{col, lit};
672    use datatypes::prelude::ConcreteDataType;
673    use datatypes::schema::ColumnSchema;
674    use datatypes::vectors::{Helper, UInt32Vector, UInt64Vector, VectorRef};
675    use session::context::{QueryContext, QueryContextBuilder};
676    use table::table::numbers::{NUMBERS_TABLE_NAME, NumbersTable};
677
678    use super::*;
679    use crate::options::QueryOptions;
680    use crate::parser::QueryLanguageParser;
681    use crate::query_engine::{QueryEngineFactory, QueryEngineRef};
682
683    async fn create_test_engine() -> QueryEngineRef {
684        let catalog_manager = catalog::memory::new_memory_catalog_manager().unwrap();
685        let req = RegisterTableRequest {
686            catalog: DEFAULT_CATALOG_NAME.to_string(),
687            schema: DEFAULT_SCHEMA_NAME.to_string(),
688            table_name: NUMBERS_TABLE_NAME.to_string(),
689            table_id: NUMBERS_TABLE_ID,
690            table: NumbersTable::table(NUMBERS_TABLE_ID),
691        };
692        catalog_manager.register_table_sync(req).unwrap();
693
694        QueryEngineFactory::new(
695            catalog_manager,
696            None,
697            None,
698            None,
699            None,
700            false,
701            QueryOptions::default(),
702        )
703        .query_engine()
704    }
705
706    #[tokio::test]
707    async fn test_sql_to_plan() {
708        let engine = create_test_engine().await;
709        let sql = "select sum(number) from numbers limit 20";
710
711        let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
712        let plan = engine
713            .planner()
714            .plan(&stmt, QueryContext::arc())
715            .await
716            .unwrap();
717
718        assert_eq!(
719            plan.to_string(),
720            r#"Limit: skip=0, fetch=20
721  Projection: sum(numbers.number)
722    Aggregate: groupBy=[[]], aggr=[[sum(numbers.number)]]
723      TableScan: numbers"#
724        );
725    }
726
727    #[tokio::test]
728    async fn test_execute() {
729        let engine = create_test_engine().await;
730        let sql = "select sum(number) from numbers limit 20";
731
732        let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
733        let plan = engine
734            .planner()
735            .plan(&stmt, QueryContext::arc())
736            .await
737            .unwrap();
738
739        let output = engine.execute(plan, QueryContext::arc()).await.unwrap();
740
741        match output.data {
742            OutputData::Stream(recordbatch) => {
743                let numbers = util::collect(recordbatch).await.unwrap();
744                assert_eq!(1, numbers.len());
745                assert_eq!(numbers[0].num_columns(), 1);
746                assert_eq!(1, numbers[0].schema.num_columns());
747                assert_eq!(
748                    "sum(numbers.number)",
749                    numbers[0].schema.column_schemas()[0].name
750                );
751
752                let batch = &numbers[0];
753                assert_eq!(1, batch.num_columns());
754                assert_eq!(batch.column(0).len(), 1);
755
756                assert_eq!(
757                    *batch.column(0),
758                    Arc::new(UInt64Vector::from_slice([4950])) as VectorRef
759                );
760            }
761            _ => unreachable!(),
762        }
763    }
764
765    #[tokio::test]
766    async fn test_read_table() {
767        let engine = create_test_engine().await;
768
769        let engine = engine
770            .as_any()
771            .downcast_ref::<DatafusionQueryEngine>()
772            .unwrap();
773        let query_ctx = Arc::new(QueryContextBuilder::default().build());
774        let table = engine
775            .find_table(
776                &ResolvedTableReference {
777                    catalog: "greptime".into(),
778                    schema: "public".into(),
779                    table: "numbers".into(),
780                },
781                &query_ctx,
782            )
783            .await
784            .unwrap();
785
786        let DataFrame::DataFusion(df) = engine.read_table(table).unwrap();
787        let df = df
788            .select_columns(&["number"])
789            .unwrap()
790            .filter(col("number").lt(lit(10)))
791            .unwrap();
792        let batches = df.collect().await.unwrap();
793        assert_eq!(1, batches.len());
794        let batch = &batches[0];
795
796        assert_eq!(1, batch.num_columns());
797        assert_eq!(batch.column(0).len(), 10);
798
799        assert_eq!(
800            Helper::try_into_vector(batch.column(0)).unwrap(),
801            Arc::new(UInt32Vector::from_slice([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])) as VectorRef
802        );
803    }
804
805    #[tokio::test]
806    async fn test_describe() {
807        let engine = create_test_engine().await;
808        let sql = "select sum(number) from numbers limit 20";
809
810        let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
811
812        let plan = engine
813            .planner()
814            .plan(&stmt, QueryContext::arc())
815            .await
816            .unwrap();
817
818        let DescribeResult {
819            schema,
820            logical_plan,
821        } = engine.describe(plan, QueryContext::arc()).await.unwrap();
822
823        assert_eq!(
824            schema.column_schemas()[0],
825            ColumnSchema::new(
826                "sum(numbers.number)",
827                ConcreteDataType::uint64_datatype(),
828                true
829            )
830        );
831        assert_eq!(
832            "Limit: skip=0, fetch=20\n  Aggregate: groupBy=[[]], aggr=[[sum(CAST(numbers.number AS UInt64))]]\n    TableScan: numbers projection=[number]",
833            format!("{}", logical_plan.display_indent())
834        );
835    }
836}