query/
datafusion.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Planner, QueryEngine implementations based on DataFusion.
16
17mod error;
18mod planner;
19
20use std::any::Any;
21use std::collections::HashMap;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use common_base::Plugins;
26use common_catalog::consts::is_readonly_schema;
27use common_error::ext::BoxedError;
28use common_function::function_factory::ScalarFunctionFactory;
29use common_query::{Output, OutputData, OutputMeta};
30use common_recordbatch::adapter::RecordBatchStreamAdapter;
31use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
32use common_telemetry::tracing;
33use datafusion::physical_plan::analyze::AnalyzeExec;
34use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
35use datafusion::physical_plan::ExecutionPlan;
36use datafusion_common::ResolvedTableReference;
37use datafusion_expr::{
38    AggregateUDF, DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlan, WriteOp,
39};
40use datatypes::prelude::VectorRef;
41use datatypes::schema::Schema;
42use futures_util::StreamExt;
43use session::context::QueryContextRef;
44use snafu::{ensure, OptionExt, ResultExt};
45use sqlparser::ast::AnalyzeFormat;
46use table::requests::{DeleteRequest, InsertRequest};
47use table::TableRef;
48
49use crate::analyze::DistAnalyzeExec;
50use crate::dataframe::DataFrame;
51pub use crate::datafusion::planner::DfContextProviderAdapter;
52use crate::dist_plan::{DistPlannerOptions, MergeScanLogicalPlan};
53use crate::error::{
54    CatalogSnafu, ConvertSchemaSnafu, CreateRecordBatchSnafu, MissingTableMutationHandlerSnafu,
55    MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, TableMutationSnafu,
56    TableNotFoundSnafu, TableReadOnlySnafu, UnsupportedExprSnafu,
57};
58use crate::executor::QueryExecutor;
59use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED};
60use crate::physical_wrapper::PhysicalPlanWrapperRef;
61use crate::planner::{DfLogicalPlanner, LogicalPlanner};
62use crate::query_engine::{DescribeResult, QueryEngineContext, QueryEngineState};
63use crate::{metrics, QueryEngine};
64
65/// Query parallelism hint key.
66/// This hint can be set in the query context to control the parallelism of the query execution.
67pub const QUERY_PARALLELISM_HINT: &str = "query_parallelism";
68
69/// Whether to fallback to the original plan when failed to push down.
70pub const QUERY_FALLBACK_HINT: &str = "query_fallback";
71
72pub struct DatafusionQueryEngine {
73    state: Arc<QueryEngineState>,
74    plugins: Plugins,
75}
76
77impl DatafusionQueryEngine {
78    pub fn new(state: Arc<QueryEngineState>, plugins: Plugins) -> Self {
79        Self { state, plugins }
80    }
81
82    #[tracing::instrument(skip_all)]
83    async fn exec_query_plan(
84        &self,
85        plan: LogicalPlan,
86        query_ctx: QueryContextRef,
87    ) -> Result<Output> {
88        let mut ctx = self.engine_context(query_ctx.clone());
89
90        // `create_physical_plan` will optimize logical plan internally
91        let physical_plan = self.create_physical_plan(&mut ctx, &plan).await?;
92        let optimized_physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?;
93
94        let physical_plan = if let Some(wrapper) = self.plugins.get::<PhysicalPlanWrapperRef>() {
95            wrapper.wrap(optimized_physical_plan, query_ctx)
96        } else {
97            optimized_physical_plan
98        };
99
100        Ok(Output::new(
101            OutputData::Stream(self.execute_stream(&ctx, &physical_plan)?),
102            OutputMeta::new_with_plan(physical_plan),
103        ))
104    }
105
106    #[tracing::instrument(skip_all)]
107    async fn exec_dml_statement(
108        &self,
109        dml: DmlStatement,
110        query_ctx: QueryContextRef,
111    ) -> Result<Output> {
112        ensure!(
113            matches!(dml.op, WriteOp::Insert(_) | WriteOp::Delete),
114            UnsupportedExprSnafu {
115                name: format!("DML op {}", dml.op),
116            }
117        );
118
119        let _timer = QUERY_STAGE_ELAPSED
120            .with_label_values(&[dml.op.name()])
121            .start_timer();
122
123        let default_catalog = &query_ctx.current_catalog().to_owned();
124        let default_schema = &query_ctx.current_schema();
125        let table_name = dml.table_name.resolve(default_catalog, default_schema);
126        let table = self.find_table(&table_name, &query_ctx).await?;
127
128        let output = self
129            .exec_query_plan((*dml.input).clone(), query_ctx.clone())
130            .await?;
131        let mut stream = match output.data {
132            OutputData::RecordBatches(batches) => batches.as_stream(),
133            OutputData::Stream(stream) => stream,
134            _ => unreachable!(),
135        };
136
137        let mut affected_rows = 0;
138        let mut insert_cost = 0;
139
140        while let Some(batch) = stream.next().await {
141            let batch = batch.context(CreateRecordBatchSnafu)?;
142            let column_vectors = batch
143                .column_vectors(&table_name.to_string(), table.schema())
144                .map_err(BoxedError::new)
145                .context(QueryExecutionSnafu)?;
146
147            match dml.op {
148                WriteOp::Insert(_) => {
149                    // We ignore the insert op.
150                    let output = self
151                        .insert(&table_name, column_vectors, query_ctx.clone())
152                        .await?;
153                    let (rows, cost) = output.extract_rows_and_cost();
154                    affected_rows += rows;
155                    insert_cost += cost;
156                }
157                WriteOp::Delete => {
158                    affected_rows += self
159                        .delete(&table_name, &table, column_vectors, query_ctx.clone())
160                        .await?;
161                }
162                _ => unreachable!("guarded by the 'ensure!' at the beginning"),
163            }
164        }
165        Ok(Output::new(
166            OutputData::AffectedRows(affected_rows),
167            OutputMeta::new_with_cost(insert_cost),
168        ))
169    }
170
171    #[tracing::instrument(skip_all)]
172    async fn delete(
173        &self,
174        table_name: &ResolvedTableReference,
175        table: &TableRef,
176        column_vectors: HashMap<String, VectorRef>,
177        query_ctx: QueryContextRef,
178    ) -> Result<usize> {
179        let catalog_name = table_name.catalog.to_string();
180        let schema_name = table_name.schema.to_string();
181        let table_name = table_name.table.to_string();
182        let table_schema = table.schema();
183
184        ensure!(
185            !is_readonly_schema(&schema_name),
186            TableReadOnlySnafu { table: table_name }
187        );
188
189        let ts_column = table_schema
190            .timestamp_column()
191            .map(|x| &x.name)
192            .with_context(|| MissingTimestampColumnSnafu {
193                table_name: table_name.to_string(),
194            })?;
195
196        let table_info = table.table_info();
197        let rowkey_columns = table_info
198            .meta
199            .row_key_column_names()
200            .collect::<Vec<&String>>();
201        let column_vectors = column_vectors
202            .into_iter()
203            .filter(|x| &x.0 == ts_column || rowkey_columns.contains(&&x.0))
204            .collect::<HashMap<_, _>>();
205
206        let request = DeleteRequest {
207            catalog_name,
208            schema_name,
209            table_name,
210            key_column_values: column_vectors,
211        };
212
213        self.state
214            .table_mutation_handler()
215            .context(MissingTableMutationHandlerSnafu)?
216            .delete(request, query_ctx)
217            .await
218            .context(TableMutationSnafu)
219    }
220
221    #[tracing::instrument(skip_all)]
222    async fn insert(
223        &self,
224        table_name: &ResolvedTableReference,
225        column_vectors: HashMap<String, VectorRef>,
226        query_ctx: QueryContextRef,
227    ) -> Result<Output> {
228        let catalog_name = table_name.catalog.to_string();
229        let schema_name = table_name.schema.to_string();
230        let table_name = table_name.table.to_string();
231
232        ensure!(
233            !is_readonly_schema(&schema_name),
234            TableReadOnlySnafu { table: table_name }
235        );
236
237        let request = InsertRequest {
238            catalog_name,
239            schema_name,
240            table_name,
241            columns_values: column_vectors,
242        };
243
244        self.state
245            .table_mutation_handler()
246            .context(MissingTableMutationHandlerSnafu)?
247            .insert(request, query_ctx)
248            .await
249            .context(TableMutationSnafu)
250    }
251
252    async fn find_table(
253        &self,
254        table_name: &ResolvedTableReference,
255        query_context: &QueryContextRef,
256    ) -> Result<TableRef> {
257        let catalog_name = table_name.catalog.as_ref();
258        let schema_name = table_name.schema.as_ref();
259        let table_name = table_name.table.as_ref();
260
261        self.state
262            .catalog_manager()
263            .table(catalog_name, schema_name, table_name, Some(query_context))
264            .await
265            .context(CatalogSnafu)?
266            .with_context(|| TableNotFoundSnafu { table: table_name })
267    }
268
269    #[tracing::instrument(skip_all)]
270    async fn create_physical_plan(
271        &self,
272        ctx: &mut QueryEngineContext,
273        logical_plan: &LogicalPlan,
274    ) -> Result<Arc<dyn ExecutionPlan>> {
275        /// Only print context on panic, to avoid cluttering logs.
276        ///
277        /// TODO(discord9): remove this once we catch the bug
278        #[derive(Debug)]
279        struct PanicLogger<'a> {
280            input_logical_plan: &'a LogicalPlan,
281            after_analyze: Option<LogicalPlan>,
282            after_optimize: Option<LogicalPlan>,
283            phy_plan: Option<Arc<dyn ExecutionPlan>>,
284        }
285        impl Drop for PanicLogger<'_> {
286            fn drop(&mut self) {
287                if std::thread::panicking() {
288                    common_telemetry::error!(
289                        "Panic while creating physical plan, input logical plan: {:?}, after analyze: {:?}, after optimize: {:?}, final physical plan: {:?}",
290                        self.input_logical_plan,
291                        self.after_analyze,
292                        self.after_optimize,
293                        self.phy_plan
294                    );
295                }
296            }
297        }
298
299        let mut logger = PanicLogger {
300            input_logical_plan: logical_plan,
301            after_analyze: None,
302            after_optimize: None,
303            phy_plan: None,
304        };
305
306        let _timer = metrics::CREATE_PHYSICAL_ELAPSED.start_timer();
307        let state = ctx.state();
308
309        common_telemetry::debug!("Create physical plan, input plan: {logical_plan}");
310
311        // special handle EXPLAIN plan
312        if matches!(logical_plan, DfLogicalPlan::Explain(_)) {
313            return state
314                .create_physical_plan(logical_plan)
315                .await
316                .context(error::DatafusionSnafu)
317                .map_err(BoxedError::new)
318                .context(QueryExecutionSnafu);
319        }
320
321        // analyze first
322        let analyzed_plan = state
323            .analyzer()
324            .execute_and_check(logical_plan.clone(), state.config_options(), |_, _| {})
325            .context(error::DatafusionSnafu)
326            .map_err(BoxedError::new)
327            .context(QueryExecutionSnafu)?;
328
329        logger.after_analyze = Some(analyzed_plan.clone());
330
331        common_telemetry::debug!("Create physical plan, analyzed plan: {analyzed_plan}");
332
333        // skip optimize for MergeScan
334        let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan
335            && ext.node.name() == MergeScanLogicalPlan::name()
336        {
337            analyzed_plan.clone()
338        } else {
339            state
340                .optimizer()
341                .optimize(analyzed_plan, state, |_, _| {})
342                .context(error::DatafusionSnafu)
343                .map_err(BoxedError::new)
344                .context(QueryExecutionSnafu)?
345        };
346
347        common_telemetry::debug!("Create physical plan, optimized plan: {optimized_plan}");
348        logger.after_optimize = Some(optimized_plan.clone());
349
350        let physical_plan = state
351            .query_planner()
352            .create_physical_plan(&optimized_plan, state)
353            .await?;
354
355        logger.phy_plan = Some(physical_plan.clone());
356        drop(logger);
357        Ok(physical_plan)
358    }
359
360    #[tracing::instrument(skip_all)]
361    pub fn optimize(
362        &self,
363        context: &QueryEngineContext,
364        plan: &LogicalPlan,
365    ) -> Result<LogicalPlan> {
366        let _timer = metrics::OPTIMIZE_LOGICAL_ELAPSED.start_timer();
367
368        // Optimized by extension rules
369        let optimized_plan = self
370            .state
371            .optimize_by_extension_rules(plan.clone(), context)
372            .context(error::DatafusionSnafu)
373            .map_err(BoxedError::new)
374            .context(QueryExecutionSnafu)?;
375
376        // Optimized by datafusion optimizer
377        let optimized_plan = self
378            .state
379            .session_state()
380            .optimize(&optimized_plan)
381            .context(error::DatafusionSnafu)
382            .map_err(BoxedError::new)
383            .context(QueryExecutionSnafu)?;
384
385        Ok(optimized_plan)
386    }
387
388    #[tracing::instrument(skip_all)]
389    fn optimize_physical_plan(
390        &self,
391        ctx: &mut QueryEngineContext,
392        plan: Arc<dyn ExecutionPlan>,
393    ) -> Result<Arc<dyn ExecutionPlan>> {
394        let _timer = metrics::OPTIMIZE_PHYSICAL_ELAPSED.start_timer();
395
396        // TODO(ruihang): `self.create_physical_plan()` already optimize the plan, check
397        // if we need to optimize it again here.
398        // let state = ctx.state();
399        // let config = state.config_options();
400
401        // skip optimize AnalyzeExec plan
402        let optimized_plan = if let Some(analyze_plan) = plan.as_any().downcast_ref::<AnalyzeExec>()
403        {
404            let format = if let Some(format) = ctx.query_ctx().explain_format()
405                && format.to_lowercase() == "json"
406            {
407                AnalyzeFormat::JSON
408            } else {
409                AnalyzeFormat::TEXT
410            };
411            // Sets the verbose flag of the query context.
412            // The MergeScanExec plan uses the verbose flag to determine whether to print the plan in verbose mode.
413            ctx.query_ctx().set_explain_verbose(analyze_plan.verbose());
414
415            Arc::new(DistAnalyzeExec::new(
416                analyze_plan.input().clone(),
417                analyze_plan.verbose(),
418                format,
419            ))
420            // let mut new_plan = analyze_plan.input().clone();
421            // for optimizer in state.physical_optimizers() {
422            //     new_plan = optimizer
423            //         .optimize(new_plan, config)
424            //         .context(DataFusionSnafu)?;
425            // }
426            // Arc::new(DistAnalyzeExec::new(new_plan))
427        } else {
428            plan
429            // let mut new_plan = plan;
430            // for optimizer in state.physical_optimizers() {
431            //     new_plan = optimizer
432            //         .optimize(new_plan, config)
433            //         .context(DataFusionSnafu)?;
434            // }
435            // new_plan
436        };
437
438        Ok(optimized_plan)
439    }
440}
441
442#[async_trait]
443impl QueryEngine for DatafusionQueryEngine {
444    fn as_any(&self) -> &dyn Any {
445        self
446    }
447
448    fn planner(&self) -> Arc<dyn LogicalPlanner> {
449        Arc::new(DfLogicalPlanner::new(self.state.clone()))
450    }
451
452    fn name(&self) -> &str {
453        "datafusion"
454    }
455
456    async fn describe(
457        &self,
458        plan: LogicalPlan,
459        query_ctx: QueryContextRef,
460    ) -> Result<DescribeResult> {
461        let ctx = self.engine_context(query_ctx);
462        if let Ok(optimised_plan) = self.optimize(&ctx, &plan) {
463            let schema = optimised_plan
464                .schema()
465                .clone()
466                .try_into()
467                .context(ConvertSchemaSnafu)?;
468            Ok(DescribeResult {
469                schema,
470                logical_plan: optimised_plan,
471            })
472        } else {
473            // Table's like those in information_schema cannot be optimized when
474            // it contains parameters. So we fallback to original plans.
475            let schema = plan
476                .schema()
477                .clone()
478                .try_into()
479                .context(ConvertSchemaSnafu)?;
480            Ok(DescribeResult {
481                schema,
482                logical_plan: plan,
483            })
484        }
485    }
486
487    async fn execute(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
488        match plan {
489            LogicalPlan::Dml(dml) => self.exec_dml_statement(dml, query_ctx).await,
490            _ => self.exec_query_plan(plan, query_ctx).await,
491        }
492    }
493
494    /// Note in SQL queries, aggregate names are looked up using
495    /// lowercase unless the query uses quotes. For example,
496    ///
497    /// `SELECT MY_UDAF(x)...` will look for an aggregate named `"my_udaf"`
498    /// `SELECT "my_UDAF"(x)` will look for an aggregate named `"my_UDAF"`
499    ///
500    /// So it's better to make UDAF name lowercase when creating one.
501    fn register_aggregate_function(&self, func: AggregateUDF) {
502        self.state.register_aggr_function(func);
503    }
504
505    /// Register an scalar function.
506    /// Will override if the function with same name is already registered.
507    fn register_scalar_function(&self, func: ScalarFunctionFactory) {
508        self.state.register_scalar_function(func);
509    }
510
511    fn read_table(&self, table: TableRef) -> Result<DataFrame> {
512        Ok(DataFrame::DataFusion(
513            self.state
514                .read_table(table)
515                .context(error::DatafusionSnafu)
516                .map_err(BoxedError::new)
517                .context(QueryExecutionSnafu)?,
518        ))
519    }
520
521    fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext {
522        let mut state = self.state.session_state();
523        state.config_mut().set_extension(query_ctx.clone());
524        // note that hints in "x-greptime-hints" is automatically parsed
525        // and set to query context's extension, so we can get it from query context.
526        if let Some(parallelism) = query_ctx.extension(QUERY_PARALLELISM_HINT) {
527            if let Ok(n) = parallelism.parse::<u64>() {
528                if n > 0 {
529                    let new_cfg = state.config().clone().with_target_partitions(n as usize);
530                    *state.config_mut() = new_cfg;
531                }
532            } else {
533                common_telemetry::warn!(
534                    "Failed to parse query_parallelism: {}, using default value",
535                    parallelism
536                );
537            }
538        }
539
540        // usually it's impossible to have both `set variable` set by sql client and
541        // hint in header by grpc client, so only need to deal with them separately
542        if query_ctx.configuration_parameter().allow_query_fallback() {
543            state
544                .config_mut()
545                .options_mut()
546                .extensions
547                .insert(DistPlannerOptions {
548                    allow_query_fallback: true,
549                });
550        } else if let Some(fallback) = query_ctx.extension(QUERY_FALLBACK_HINT) {
551            // also check the query context for fallback hint
552            // if it is set, we will enable the fallback
553            if fallback.to_lowercase().parse::<bool>().unwrap_or(false) {
554                state
555                    .config_mut()
556                    .options_mut()
557                    .extensions
558                    .insert(DistPlannerOptions {
559                        allow_query_fallback: true,
560                    });
561            }
562        }
563        QueryEngineContext::new(state, query_ctx)
564    }
565
566    fn engine_state(&self) -> &QueryEngineState {
567        &self.state
568    }
569}
570
571impl QueryExecutor for DatafusionQueryEngine {
572    #[tracing::instrument(skip_all)]
573    fn execute_stream(
574        &self,
575        ctx: &QueryEngineContext,
576        plan: &Arc<dyn ExecutionPlan>,
577    ) -> Result<SendableRecordBatchStream> {
578        let explain_verbose = ctx.query_ctx().explain_verbose();
579        let output_partitions = plan.properties().output_partitioning().partition_count();
580        if explain_verbose {
581            common_telemetry::info!("Executing query plan, output_partitions: {output_partitions}");
582        }
583
584        let exec_timer = metrics::EXEC_PLAN_ELAPSED.start_timer();
585        let task_ctx = ctx.build_task_ctx();
586
587        match plan.properties().output_partitioning().partition_count() {
588            0 => {
589                let schema = Arc::new(
590                    Schema::try_from(plan.schema())
591                        .map_err(BoxedError::new)
592                        .context(QueryExecutionSnafu)?,
593                );
594                Ok(Box::pin(EmptyRecordBatchStream::new(schema)))
595            }
596            1 => {
597                let df_stream = plan
598                    .execute(0, task_ctx)
599                    .context(error::DatafusionSnafu)
600                    .map_err(BoxedError::new)
601                    .context(QueryExecutionSnafu)?;
602                let mut stream = RecordBatchStreamAdapter::try_new(df_stream)
603                    .context(error::ConvertDfRecordBatchStreamSnafu)
604                    .map_err(BoxedError::new)
605                    .context(QueryExecutionSnafu)?;
606                stream.set_metrics2(plan.clone());
607                stream.set_explain_verbose(explain_verbose);
608                let stream = OnDone::new(Box::pin(stream), move || {
609                    let exec_cost = exec_timer.stop_and_record();
610                    if explain_verbose {
611                        common_telemetry::info!(
612                            "DatafusionQueryEngine execute 1 stream, cost: {:?}s",
613                            exec_cost,
614                        );
615                    }
616                });
617                Ok(Box::pin(stream))
618            }
619            _ => {
620                // merge into a single partition
621                let merged_plan = CoalescePartitionsExec::new(plan.clone());
622                // CoalescePartitionsExec must produce a single partition
623                assert_eq!(
624                    1,
625                    merged_plan
626                        .properties()
627                        .output_partitioning()
628                        .partition_count()
629                );
630                let df_stream = merged_plan
631                    .execute(0, task_ctx)
632                    .context(error::DatafusionSnafu)
633                    .map_err(BoxedError::new)
634                    .context(QueryExecutionSnafu)?;
635                let mut stream = RecordBatchStreamAdapter::try_new(df_stream)
636                    .context(error::ConvertDfRecordBatchStreamSnafu)
637                    .map_err(BoxedError::new)
638                    .context(QueryExecutionSnafu)?;
639                stream.set_metrics2(plan.clone());
640                stream.set_explain_verbose(ctx.query_ctx().explain_verbose());
641                let stream = OnDone::new(Box::pin(stream), move || {
642                    let exec_cost = exec_timer.stop_and_record();
643                    if explain_verbose {
644                        common_telemetry::info!(
645                            "DatafusionQueryEngine execute {output_partitions} stream, cost: {:?}s",
646                            exec_cost
647                        );
648                    }
649                });
650                Ok(Box::pin(stream))
651            }
652        }
653    }
654}
655
656#[cfg(test)]
657mod tests {
658    use std::sync::Arc;
659
660    use catalog::RegisterTableRequest;
661    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID};
662    use common_recordbatch::util;
663    use datafusion::prelude::{col, lit};
664    use datatypes::prelude::ConcreteDataType;
665    use datatypes::schema::ColumnSchema;
666    use datatypes::vectors::{Helper, UInt32Vector, UInt64Vector, VectorRef};
667    use session::context::{QueryContext, QueryContextBuilder};
668    use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
669
670    use super::*;
671    use crate::options::QueryOptions;
672    use crate::parser::QueryLanguageParser;
673    use crate::query_engine::{QueryEngineFactory, QueryEngineRef};
674
675    async fn create_test_engine() -> QueryEngineRef {
676        let catalog_manager = catalog::memory::new_memory_catalog_manager().unwrap();
677        let req = RegisterTableRequest {
678            catalog: DEFAULT_CATALOG_NAME.to_string(),
679            schema: DEFAULT_SCHEMA_NAME.to_string(),
680            table_name: NUMBERS_TABLE_NAME.to_string(),
681            table_id: NUMBERS_TABLE_ID,
682            table: NumbersTable::table(NUMBERS_TABLE_ID),
683        };
684        catalog_manager.register_table_sync(req).unwrap();
685
686        QueryEngineFactory::new(
687            catalog_manager,
688            None,
689            None,
690            None,
691            None,
692            false,
693            QueryOptions::default(),
694        )
695        .query_engine()
696    }
697
698    #[tokio::test]
699    async fn test_sql_to_plan() {
700        let engine = create_test_engine().await;
701        let sql = "select sum(number) from numbers limit 20";
702
703        let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
704        let plan = engine
705            .planner()
706            .plan(&stmt, QueryContext::arc())
707            .await
708            .unwrap();
709
710        assert_eq!(
711            plan.to_string(),
712            r#"Limit: skip=0, fetch=20
713  Projection: sum(numbers.number)
714    Aggregate: groupBy=[[]], aggr=[[sum(numbers.number)]]
715      TableScan: numbers"#
716        );
717    }
718
719    #[tokio::test]
720    async fn test_execute() {
721        let engine = create_test_engine().await;
722        let sql = "select sum(number) from numbers limit 20";
723
724        let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
725        let plan = engine
726            .planner()
727            .plan(&stmt, QueryContext::arc())
728            .await
729            .unwrap();
730
731        let output = engine.execute(plan, QueryContext::arc()).await.unwrap();
732
733        match output.data {
734            OutputData::Stream(recordbatch) => {
735                let numbers = util::collect(recordbatch).await.unwrap();
736                assert_eq!(1, numbers.len());
737                assert_eq!(numbers[0].num_columns(), 1);
738                assert_eq!(1, numbers[0].schema.num_columns());
739                assert_eq!(
740                    "sum(numbers.number)",
741                    numbers[0].schema.column_schemas()[0].name
742                );
743
744                let batch = &numbers[0];
745                assert_eq!(1, batch.num_columns());
746                assert_eq!(batch.column(0).len(), 1);
747
748                assert_eq!(
749                    *batch.column(0),
750                    Arc::new(UInt64Vector::from_slice([4950])) as VectorRef
751                );
752            }
753            _ => unreachable!(),
754        }
755    }
756
757    #[tokio::test]
758    async fn test_read_table() {
759        let engine = create_test_engine().await;
760
761        let engine = engine
762            .as_any()
763            .downcast_ref::<DatafusionQueryEngine>()
764            .unwrap();
765        let query_ctx = Arc::new(QueryContextBuilder::default().build());
766        let table = engine
767            .find_table(
768                &ResolvedTableReference {
769                    catalog: "greptime".into(),
770                    schema: "public".into(),
771                    table: "numbers".into(),
772                },
773                &query_ctx,
774            )
775            .await
776            .unwrap();
777
778        let DataFrame::DataFusion(df) = engine.read_table(table).unwrap();
779        let df = df
780            .select_columns(&["number"])
781            .unwrap()
782            .filter(col("number").lt(lit(10)))
783            .unwrap();
784        let batches = df.collect().await.unwrap();
785        assert_eq!(1, batches.len());
786        let batch = &batches[0];
787
788        assert_eq!(1, batch.num_columns());
789        assert_eq!(batch.column(0).len(), 10);
790
791        assert_eq!(
792            Helper::try_into_vector(batch.column(0)).unwrap(),
793            Arc::new(UInt32Vector::from_slice([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])) as VectorRef
794        );
795    }
796
797    #[tokio::test]
798    async fn test_describe() {
799        let engine = create_test_engine().await;
800        let sql = "select sum(number) from numbers limit 20";
801
802        let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
803
804        let plan = engine
805            .planner()
806            .plan(&stmt, QueryContext::arc())
807            .await
808            .unwrap();
809
810        let DescribeResult {
811            schema,
812            logical_plan,
813        } = engine.describe(plan, QueryContext::arc()).await.unwrap();
814
815        assert_eq!(
816            schema.column_schemas()[0],
817            ColumnSchema::new(
818                "sum(numbers.number)",
819                ConcreteDataType::uint64_datatype(),
820                true
821            )
822        );
823        assert_eq!("Limit: skip=0, fetch=20\n  Aggregate: groupBy=[[]], aggr=[[sum(CAST(numbers.number AS UInt64))]]\n    TableScan: numbers projection=[number]", format!("{}", logical_plan.display_indent()));
824    }
825}