1mod error;
18mod planner;
19
20use std::any::Any;
21use std::collections::HashMap;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use common_base::Plugins;
26use common_catalog::consts::is_readonly_schema;
27use common_error::ext::BoxedError;
28use common_function::function_factory::ScalarFunctionFactory;
29use common_query::{Output, OutputData, OutputMeta};
30use common_recordbatch::adapter::RecordBatchStreamAdapter;
31use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
32use common_telemetry::tracing;
33use datafusion::physical_plan::analyze::AnalyzeExec;
34use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
35use datafusion::physical_plan::ExecutionPlan;
36use datafusion_common::ResolvedTableReference;
37use datafusion_expr::{
38 AggregateUDF, DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlan, WriteOp,
39};
40use datatypes::prelude::VectorRef;
41use datatypes::schema::Schema;
42use futures_util::StreamExt;
43use session::context::QueryContextRef;
44use snafu::{ensure, OptionExt, ResultExt};
45use sqlparser::ast::AnalyzeFormat;
46use table::requests::{DeleteRequest, InsertRequest};
47use table::TableRef;
48
49use crate::analyze::DistAnalyzeExec;
50use crate::dataframe::DataFrame;
51pub use crate::datafusion::planner::DfContextProviderAdapter;
52use crate::dist_plan::{DistPlannerOptions, MergeScanLogicalPlan};
53use crate::error::{
54 CatalogSnafu, ConvertSchemaSnafu, CreateRecordBatchSnafu, MissingTableMutationHandlerSnafu,
55 MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, TableMutationSnafu,
56 TableNotFoundSnafu, TableReadOnlySnafu, UnsupportedExprSnafu,
57};
58use crate::executor::QueryExecutor;
59use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED};
60use crate::physical_wrapper::PhysicalPlanWrapperRef;
61use crate::planner::{DfLogicalPlanner, LogicalPlanner};
62use crate::query_engine::{DescribeResult, QueryEngineContext, QueryEngineState};
63use crate::{metrics, QueryEngine};
64
65pub const QUERY_PARALLELISM_HINT: &str = "query_parallelism";
68
69pub const QUERY_FALLBACK_HINT: &str = "query_fallback";
71
72pub struct DatafusionQueryEngine {
73 state: Arc<QueryEngineState>,
74 plugins: Plugins,
75}
76
77impl DatafusionQueryEngine {
78 pub fn new(state: Arc<QueryEngineState>, plugins: Plugins) -> Self {
79 Self { state, plugins }
80 }
81
82 #[tracing::instrument(skip_all)]
83 async fn exec_query_plan(
84 &self,
85 plan: LogicalPlan,
86 query_ctx: QueryContextRef,
87 ) -> Result<Output> {
88 let mut ctx = self.engine_context(query_ctx.clone());
89
90 let physical_plan = self.create_physical_plan(&mut ctx, &plan).await?;
92 let optimized_physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?;
93
94 let physical_plan = if let Some(wrapper) = self.plugins.get::<PhysicalPlanWrapperRef>() {
95 wrapper.wrap(optimized_physical_plan, query_ctx)
96 } else {
97 optimized_physical_plan
98 };
99
100 Ok(Output::new(
101 OutputData::Stream(self.execute_stream(&ctx, &physical_plan)?),
102 OutputMeta::new_with_plan(physical_plan),
103 ))
104 }
105
106 #[tracing::instrument(skip_all)]
107 async fn exec_dml_statement(
108 &self,
109 dml: DmlStatement,
110 query_ctx: QueryContextRef,
111 ) -> Result<Output> {
112 ensure!(
113 matches!(dml.op, WriteOp::Insert(_) | WriteOp::Delete),
114 UnsupportedExprSnafu {
115 name: format!("DML op {}", dml.op),
116 }
117 );
118
119 let _timer = QUERY_STAGE_ELAPSED
120 .with_label_values(&[dml.op.name()])
121 .start_timer();
122
123 let default_catalog = &query_ctx.current_catalog().to_owned();
124 let default_schema = &query_ctx.current_schema();
125 let table_name = dml.table_name.resolve(default_catalog, default_schema);
126 let table = self.find_table(&table_name, &query_ctx).await?;
127
128 let output = self
129 .exec_query_plan((*dml.input).clone(), query_ctx.clone())
130 .await?;
131 let mut stream = match output.data {
132 OutputData::RecordBatches(batches) => batches.as_stream(),
133 OutputData::Stream(stream) => stream,
134 _ => unreachable!(),
135 };
136
137 let mut affected_rows = 0;
138 let mut insert_cost = 0;
139
140 while let Some(batch) = stream.next().await {
141 let batch = batch.context(CreateRecordBatchSnafu)?;
142 let column_vectors = batch
143 .column_vectors(&table_name.to_string(), table.schema())
144 .map_err(BoxedError::new)
145 .context(QueryExecutionSnafu)?;
146
147 match dml.op {
148 WriteOp::Insert(_) => {
149 let output = self
151 .insert(&table_name, column_vectors, query_ctx.clone())
152 .await?;
153 let (rows, cost) = output.extract_rows_and_cost();
154 affected_rows += rows;
155 insert_cost += cost;
156 }
157 WriteOp::Delete => {
158 affected_rows += self
159 .delete(&table_name, &table, column_vectors, query_ctx.clone())
160 .await?;
161 }
162 _ => unreachable!("guarded by the 'ensure!' at the beginning"),
163 }
164 }
165 Ok(Output::new(
166 OutputData::AffectedRows(affected_rows),
167 OutputMeta::new_with_cost(insert_cost),
168 ))
169 }
170
171 #[tracing::instrument(skip_all)]
172 async fn delete(
173 &self,
174 table_name: &ResolvedTableReference,
175 table: &TableRef,
176 column_vectors: HashMap<String, VectorRef>,
177 query_ctx: QueryContextRef,
178 ) -> Result<usize> {
179 let catalog_name = table_name.catalog.to_string();
180 let schema_name = table_name.schema.to_string();
181 let table_name = table_name.table.to_string();
182 let table_schema = table.schema();
183
184 ensure!(
185 !is_readonly_schema(&schema_name),
186 TableReadOnlySnafu { table: table_name }
187 );
188
189 let ts_column = table_schema
190 .timestamp_column()
191 .map(|x| &x.name)
192 .with_context(|| MissingTimestampColumnSnafu {
193 table_name: table_name.to_string(),
194 })?;
195
196 let table_info = table.table_info();
197 let rowkey_columns = table_info
198 .meta
199 .row_key_column_names()
200 .collect::<Vec<&String>>();
201 let column_vectors = column_vectors
202 .into_iter()
203 .filter(|x| &x.0 == ts_column || rowkey_columns.contains(&&x.0))
204 .collect::<HashMap<_, _>>();
205
206 let request = DeleteRequest {
207 catalog_name,
208 schema_name,
209 table_name,
210 key_column_values: column_vectors,
211 };
212
213 self.state
214 .table_mutation_handler()
215 .context(MissingTableMutationHandlerSnafu)?
216 .delete(request, query_ctx)
217 .await
218 .context(TableMutationSnafu)
219 }
220
221 #[tracing::instrument(skip_all)]
222 async fn insert(
223 &self,
224 table_name: &ResolvedTableReference,
225 column_vectors: HashMap<String, VectorRef>,
226 query_ctx: QueryContextRef,
227 ) -> Result<Output> {
228 let catalog_name = table_name.catalog.to_string();
229 let schema_name = table_name.schema.to_string();
230 let table_name = table_name.table.to_string();
231
232 ensure!(
233 !is_readonly_schema(&schema_name),
234 TableReadOnlySnafu { table: table_name }
235 );
236
237 let request = InsertRequest {
238 catalog_name,
239 schema_name,
240 table_name,
241 columns_values: column_vectors,
242 };
243
244 self.state
245 .table_mutation_handler()
246 .context(MissingTableMutationHandlerSnafu)?
247 .insert(request, query_ctx)
248 .await
249 .context(TableMutationSnafu)
250 }
251
252 async fn find_table(
253 &self,
254 table_name: &ResolvedTableReference,
255 query_context: &QueryContextRef,
256 ) -> Result<TableRef> {
257 let catalog_name = table_name.catalog.as_ref();
258 let schema_name = table_name.schema.as_ref();
259 let table_name = table_name.table.as_ref();
260
261 self.state
262 .catalog_manager()
263 .table(catalog_name, schema_name, table_name, Some(query_context))
264 .await
265 .context(CatalogSnafu)?
266 .with_context(|| TableNotFoundSnafu { table: table_name })
267 }
268
269 #[tracing::instrument(skip_all)]
270 async fn create_physical_plan(
271 &self,
272 ctx: &mut QueryEngineContext,
273 logical_plan: &LogicalPlan,
274 ) -> Result<Arc<dyn ExecutionPlan>> {
275 #[derive(Debug)]
279 struct PanicLogger<'a> {
280 input_logical_plan: &'a LogicalPlan,
281 after_analyze: Option<LogicalPlan>,
282 after_optimize: Option<LogicalPlan>,
283 phy_plan: Option<Arc<dyn ExecutionPlan>>,
284 }
285 impl Drop for PanicLogger<'_> {
286 fn drop(&mut self) {
287 if std::thread::panicking() {
288 common_telemetry::error!(
289 "Panic while creating physical plan, input logical plan: {:?}, after analyze: {:?}, after optimize: {:?}, final physical plan: {:?}",
290 self.input_logical_plan,
291 self.after_analyze,
292 self.after_optimize,
293 self.phy_plan
294 );
295 }
296 }
297 }
298
299 let mut logger = PanicLogger {
300 input_logical_plan: logical_plan,
301 after_analyze: None,
302 after_optimize: None,
303 phy_plan: None,
304 };
305
306 let _timer = metrics::CREATE_PHYSICAL_ELAPSED.start_timer();
307 let state = ctx.state();
308
309 common_telemetry::debug!("Create physical plan, input plan: {logical_plan}");
310
311 if matches!(logical_plan, DfLogicalPlan::Explain(_)) {
313 return state
314 .create_physical_plan(logical_plan)
315 .await
316 .context(error::DatafusionSnafu)
317 .map_err(BoxedError::new)
318 .context(QueryExecutionSnafu);
319 }
320
321 let analyzed_plan = state
323 .analyzer()
324 .execute_and_check(logical_plan.clone(), state.config_options(), |_, _| {})
325 .context(error::DatafusionSnafu)
326 .map_err(BoxedError::new)
327 .context(QueryExecutionSnafu)?;
328
329 logger.after_analyze = Some(analyzed_plan.clone());
330
331 common_telemetry::debug!("Create physical plan, analyzed plan: {analyzed_plan}");
332
333 let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan
335 && ext.node.name() == MergeScanLogicalPlan::name()
336 {
337 analyzed_plan.clone()
338 } else {
339 state
340 .optimizer()
341 .optimize(analyzed_plan, state, |_, _| {})
342 .context(error::DatafusionSnafu)
343 .map_err(BoxedError::new)
344 .context(QueryExecutionSnafu)?
345 };
346
347 common_telemetry::debug!("Create physical plan, optimized plan: {optimized_plan}");
348 logger.after_optimize = Some(optimized_plan.clone());
349
350 let physical_plan = state
351 .query_planner()
352 .create_physical_plan(&optimized_plan, state)
353 .await?;
354
355 logger.phy_plan = Some(physical_plan.clone());
356 drop(logger);
357 Ok(physical_plan)
358 }
359
360 #[tracing::instrument(skip_all)]
361 pub fn optimize(
362 &self,
363 context: &QueryEngineContext,
364 plan: &LogicalPlan,
365 ) -> Result<LogicalPlan> {
366 let _timer = metrics::OPTIMIZE_LOGICAL_ELAPSED.start_timer();
367
368 let optimized_plan = self
370 .state
371 .optimize_by_extension_rules(plan.clone(), context)
372 .context(error::DatafusionSnafu)
373 .map_err(BoxedError::new)
374 .context(QueryExecutionSnafu)?;
375
376 let optimized_plan = self
378 .state
379 .session_state()
380 .optimize(&optimized_plan)
381 .context(error::DatafusionSnafu)
382 .map_err(BoxedError::new)
383 .context(QueryExecutionSnafu)?;
384
385 Ok(optimized_plan)
386 }
387
388 #[tracing::instrument(skip_all)]
389 fn optimize_physical_plan(
390 &self,
391 ctx: &mut QueryEngineContext,
392 plan: Arc<dyn ExecutionPlan>,
393 ) -> Result<Arc<dyn ExecutionPlan>> {
394 let _timer = metrics::OPTIMIZE_PHYSICAL_ELAPSED.start_timer();
395
396 let optimized_plan = if let Some(analyze_plan) = plan.as_any().downcast_ref::<AnalyzeExec>()
403 {
404 let format = if let Some(format) = ctx.query_ctx().explain_format()
405 && format.to_lowercase() == "json"
406 {
407 AnalyzeFormat::JSON
408 } else {
409 AnalyzeFormat::TEXT
410 };
411 ctx.query_ctx().set_explain_verbose(analyze_plan.verbose());
414
415 Arc::new(DistAnalyzeExec::new(
416 analyze_plan.input().clone(),
417 analyze_plan.verbose(),
418 format,
419 ))
420 } else {
428 plan
429 };
437
438 Ok(optimized_plan)
439 }
440}
441
442#[async_trait]
443impl QueryEngine for DatafusionQueryEngine {
444 fn as_any(&self) -> &dyn Any {
445 self
446 }
447
448 fn planner(&self) -> Arc<dyn LogicalPlanner> {
449 Arc::new(DfLogicalPlanner::new(self.state.clone()))
450 }
451
452 fn name(&self) -> &str {
453 "datafusion"
454 }
455
456 async fn describe(
457 &self,
458 plan: LogicalPlan,
459 query_ctx: QueryContextRef,
460 ) -> Result<DescribeResult> {
461 let ctx = self.engine_context(query_ctx);
462 if let Ok(optimised_plan) = self.optimize(&ctx, &plan) {
463 let schema = optimised_plan
464 .schema()
465 .clone()
466 .try_into()
467 .context(ConvertSchemaSnafu)?;
468 Ok(DescribeResult {
469 schema,
470 logical_plan: optimised_plan,
471 })
472 } else {
473 let schema = plan
476 .schema()
477 .clone()
478 .try_into()
479 .context(ConvertSchemaSnafu)?;
480 Ok(DescribeResult {
481 schema,
482 logical_plan: plan,
483 })
484 }
485 }
486
487 async fn execute(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
488 match plan {
489 LogicalPlan::Dml(dml) => self.exec_dml_statement(dml, query_ctx).await,
490 _ => self.exec_query_plan(plan, query_ctx).await,
491 }
492 }
493
494 fn register_aggregate_function(&self, func: AggregateUDF) {
502 self.state.register_aggr_function(func);
503 }
504
505 fn register_scalar_function(&self, func: ScalarFunctionFactory) {
508 self.state.register_scalar_function(func);
509 }
510
511 fn read_table(&self, table: TableRef) -> Result<DataFrame> {
512 Ok(DataFrame::DataFusion(
513 self.state
514 .read_table(table)
515 .context(error::DatafusionSnafu)
516 .map_err(BoxedError::new)
517 .context(QueryExecutionSnafu)?,
518 ))
519 }
520
521 fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext {
522 let mut state = self.state.session_state();
523 state.config_mut().set_extension(query_ctx.clone());
524 if let Some(parallelism) = query_ctx.extension(QUERY_PARALLELISM_HINT) {
527 if let Ok(n) = parallelism.parse::<u64>() {
528 if n > 0 {
529 let new_cfg = state.config().clone().with_target_partitions(n as usize);
530 *state.config_mut() = new_cfg;
531 }
532 } else {
533 common_telemetry::warn!(
534 "Failed to parse query_parallelism: {}, using default value",
535 parallelism
536 );
537 }
538 }
539
540 if query_ctx.configuration_parameter().allow_query_fallback() {
543 state
544 .config_mut()
545 .options_mut()
546 .extensions
547 .insert(DistPlannerOptions {
548 allow_query_fallback: true,
549 });
550 } else if let Some(fallback) = query_ctx.extension(QUERY_FALLBACK_HINT) {
551 if fallback.to_lowercase().parse::<bool>().unwrap_or(false) {
554 state
555 .config_mut()
556 .options_mut()
557 .extensions
558 .insert(DistPlannerOptions {
559 allow_query_fallback: true,
560 });
561 }
562 }
563 QueryEngineContext::new(state, query_ctx)
564 }
565
566 fn engine_state(&self) -> &QueryEngineState {
567 &self.state
568 }
569}
570
571impl QueryExecutor for DatafusionQueryEngine {
572 #[tracing::instrument(skip_all)]
573 fn execute_stream(
574 &self,
575 ctx: &QueryEngineContext,
576 plan: &Arc<dyn ExecutionPlan>,
577 ) -> Result<SendableRecordBatchStream> {
578 let explain_verbose = ctx.query_ctx().explain_verbose();
579 let output_partitions = plan.properties().output_partitioning().partition_count();
580 if explain_verbose {
581 common_telemetry::info!("Executing query plan, output_partitions: {output_partitions}");
582 }
583
584 let exec_timer = metrics::EXEC_PLAN_ELAPSED.start_timer();
585 let task_ctx = ctx.build_task_ctx();
586
587 match plan.properties().output_partitioning().partition_count() {
588 0 => {
589 let schema = Arc::new(
590 Schema::try_from(plan.schema())
591 .map_err(BoxedError::new)
592 .context(QueryExecutionSnafu)?,
593 );
594 Ok(Box::pin(EmptyRecordBatchStream::new(schema)))
595 }
596 1 => {
597 let df_stream = plan
598 .execute(0, task_ctx)
599 .context(error::DatafusionSnafu)
600 .map_err(BoxedError::new)
601 .context(QueryExecutionSnafu)?;
602 let mut stream = RecordBatchStreamAdapter::try_new(df_stream)
603 .context(error::ConvertDfRecordBatchStreamSnafu)
604 .map_err(BoxedError::new)
605 .context(QueryExecutionSnafu)?;
606 stream.set_metrics2(plan.clone());
607 stream.set_explain_verbose(explain_verbose);
608 let stream = OnDone::new(Box::pin(stream), move || {
609 let exec_cost = exec_timer.stop_and_record();
610 if explain_verbose {
611 common_telemetry::info!(
612 "DatafusionQueryEngine execute 1 stream, cost: {:?}s",
613 exec_cost,
614 );
615 }
616 });
617 Ok(Box::pin(stream))
618 }
619 _ => {
620 let merged_plan = CoalescePartitionsExec::new(plan.clone());
622 assert_eq!(
624 1,
625 merged_plan
626 .properties()
627 .output_partitioning()
628 .partition_count()
629 );
630 let df_stream = merged_plan
631 .execute(0, task_ctx)
632 .context(error::DatafusionSnafu)
633 .map_err(BoxedError::new)
634 .context(QueryExecutionSnafu)?;
635 let mut stream = RecordBatchStreamAdapter::try_new(df_stream)
636 .context(error::ConvertDfRecordBatchStreamSnafu)
637 .map_err(BoxedError::new)
638 .context(QueryExecutionSnafu)?;
639 stream.set_metrics2(plan.clone());
640 stream.set_explain_verbose(ctx.query_ctx().explain_verbose());
641 let stream = OnDone::new(Box::pin(stream), move || {
642 let exec_cost = exec_timer.stop_and_record();
643 if explain_verbose {
644 common_telemetry::info!(
645 "DatafusionQueryEngine execute {output_partitions} stream, cost: {:?}s",
646 exec_cost
647 );
648 }
649 });
650 Ok(Box::pin(stream))
651 }
652 }
653 }
654}
655
656#[cfg(test)]
657mod tests {
658 use std::sync::Arc;
659
660 use catalog::RegisterTableRequest;
661 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID};
662 use common_recordbatch::util;
663 use datafusion::prelude::{col, lit};
664 use datatypes::prelude::ConcreteDataType;
665 use datatypes::schema::ColumnSchema;
666 use datatypes::vectors::{Helper, UInt32Vector, UInt64Vector, VectorRef};
667 use session::context::{QueryContext, QueryContextBuilder};
668 use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
669
670 use super::*;
671 use crate::options::QueryOptions;
672 use crate::parser::QueryLanguageParser;
673 use crate::query_engine::{QueryEngineFactory, QueryEngineRef};
674
675 async fn create_test_engine() -> QueryEngineRef {
676 let catalog_manager = catalog::memory::new_memory_catalog_manager().unwrap();
677 let req = RegisterTableRequest {
678 catalog: DEFAULT_CATALOG_NAME.to_string(),
679 schema: DEFAULT_SCHEMA_NAME.to_string(),
680 table_name: NUMBERS_TABLE_NAME.to_string(),
681 table_id: NUMBERS_TABLE_ID,
682 table: NumbersTable::table(NUMBERS_TABLE_ID),
683 };
684 catalog_manager.register_table_sync(req).unwrap();
685
686 QueryEngineFactory::new(
687 catalog_manager,
688 None,
689 None,
690 None,
691 None,
692 false,
693 QueryOptions::default(),
694 )
695 .query_engine()
696 }
697
698 #[tokio::test]
699 async fn test_sql_to_plan() {
700 let engine = create_test_engine().await;
701 let sql = "select sum(number) from numbers limit 20";
702
703 let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
704 let plan = engine
705 .planner()
706 .plan(&stmt, QueryContext::arc())
707 .await
708 .unwrap();
709
710 assert_eq!(
711 plan.to_string(),
712 r#"Limit: skip=0, fetch=20
713 Projection: sum(numbers.number)
714 Aggregate: groupBy=[[]], aggr=[[sum(numbers.number)]]
715 TableScan: numbers"#
716 );
717 }
718
719 #[tokio::test]
720 async fn test_execute() {
721 let engine = create_test_engine().await;
722 let sql = "select sum(number) from numbers limit 20";
723
724 let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
725 let plan = engine
726 .planner()
727 .plan(&stmt, QueryContext::arc())
728 .await
729 .unwrap();
730
731 let output = engine.execute(plan, QueryContext::arc()).await.unwrap();
732
733 match output.data {
734 OutputData::Stream(recordbatch) => {
735 let numbers = util::collect(recordbatch).await.unwrap();
736 assert_eq!(1, numbers.len());
737 assert_eq!(numbers[0].num_columns(), 1);
738 assert_eq!(1, numbers[0].schema.num_columns());
739 assert_eq!(
740 "sum(numbers.number)",
741 numbers[0].schema.column_schemas()[0].name
742 );
743
744 let batch = &numbers[0];
745 assert_eq!(1, batch.num_columns());
746 assert_eq!(batch.column(0).len(), 1);
747
748 assert_eq!(
749 *batch.column(0),
750 Arc::new(UInt64Vector::from_slice([4950])) as VectorRef
751 );
752 }
753 _ => unreachable!(),
754 }
755 }
756
757 #[tokio::test]
758 async fn test_read_table() {
759 let engine = create_test_engine().await;
760
761 let engine = engine
762 .as_any()
763 .downcast_ref::<DatafusionQueryEngine>()
764 .unwrap();
765 let query_ctx = Arc::new(QueryContextBuilder::default().build());
766 let table = engine
767 .find_table(
768 &ResolvedTableReference {
769 catalog: "greptime".into(),
770 schema: "public".into(),
771 table: "numbers".into(),
772 },
773 &query_ctx,
774 )
775 .await
776 .unwrap();
777
778 let DataFrame::DataFusion(df) = engine.read_table(table).unwrap();
779 let df = df
780 .select_columns(&["number"])
781 .unwrap()
782 .filter(col("number").lt(lit(10)))
783 .unwrap();
784 let batches = df.collect().await.unwrap();
785 assert_eq!(1, batches.len());
786 let batch = &batches[0];
787
788 assert_eq!(1, batch.num_columns());
789 assert_eq!(batch.column(0).len(), 10);
790
791 assert_eq!(
792 Helper::try_into_vector(batch.column(0)).unwrap(),
793 Arc::new(UInt32Vector::from_slice([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])) as VectorRef
794 );
795 }
796
797 #[tokio::test]
798 async fn test_describe() {
799 let engine = create_test_engine().await;
800 let sql = "select sum(number) from numbers limit 20";
801
802 let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
803
804 let plan = engine
805 .planner()
806 .plan(&stmt, QueryContext::arc())
807 .await
808 .unwrap();
809
810 let DescribeResult {
811 schema,
812 logical_plan,
813 } = engine.describe(plan, QueryContext::arc()).await.unwrap();
814
815 assert_eq!(
816 schema.column_schemas()[0],
817 ColumnSchema::new(
818 "sum(numbers.number)",
819 ConcreteDataType::uint64_datatype(),
820 true
821 )
822 );
823 assert_eq!("Limit: skip=0, fetch=20\n Aggregate: groupBy=[[]], aggr=[[sum(CAST(numbers.number AS UInt64))]]\n TableScan: numbers projection=[number]", format!("{}", logical_plan.display_indent()));
824 }
825}