1mod error;
18mod planner;
19
20use std::any::Any;
21use std::collections::HashMap;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use common_base::Plugins;
26use common_catalog::consts::is_readonly_schema;
27use common_error::ext::BoxedError;
28use common_function::function_factory::ScalarFunctionFactory;
29use common_query::{Output, OutputData, OutputMeta};
30use common_recordbatch::adapter::RecordBatchStreamAdapter;
31use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
32use common_telemetry::tracing;
33use datafusion::catalog::TableFunction;
34use datafusion::physical_plan::ExecutionPlan;
35use datafusion::physical_plan::analyze::AnalyzeExec;
36use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
37use datafusion_common::ResolvedTableReference;
38use datafusion_expr::{
39 AggregateUDF, DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlan, WriteOp,
40};
41use datatypes::prelude::VectorRef;
42use datatypes::schema::Schema;
43use futures_util::StreamExt;
44use session::context::QueryContextRef;
45use snafu::{OptionExt, ResultExt, ensure};
46use sqlparser::ast::AnalyzeFormat;
47use table::TableRef;
48use table::requests::{DeleteRequest, InsertRequest};
49
50use crate::analyze::DistAnalyzeExec;
51use crate::dataframe::DataFrame;
52pub use crate::datafusion::planner::DfContextProviderAdapter;
53use crate::dist_plan::{DistPlannerOptions, MergeScanLogicalPlan};
54use crate::error::{
55 CatalogSnafu, ConvertSchemaSnafu, CreateRecordBatchSnafu, MissingTableMutationHandlerSnafu,
56 MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, TableMutationSnafu,
57 TableNotFoundSnafu, TableReadOnlySnafu, UnsupportedExprSnafu,
58};
59use crate::executor::QueryExecutor;
60use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED};
61use crate::physical_wrapper::PhysicalPlanWrapperRef;
62use crate::planner::{DfLogicalPlanner, LogicalPlanner};
63use crate::query_engine::{DescribeResult, QueryEngineContext, QueryEngineState};
64use crate::{QueryEngine, metrics};
65
66pub const QUERY_PARALLELISM_HINT: &str = "query_parallelism";
69
70pub const QUERY_FALLBACK_HINT: &str = "query_fallback";
72
73pub struct DatafusionQueryEngine {
74 state: Arc<QueryEngineState>,
75 plugins: Plugins,
76}
77
78impl DatafusionQueryEngine {
79 pub fn new(state: Arc<QueryEngineState>, plugins: Plugins) -> Self {
80 Self { state, plugins }
81 }
82
83 #[tracing::instrument(skip_all)]
84 async fn exec_query_plan(
85 &self,
86 plan: LogicalPlan,
87 query_ctx: QueryContextRef,
88 ) -> Result<Output> {
89 let mut ctx = self.engine_context(query_ctx.clone());
90
91 let physical_plan = self.create_physical_plan(&mut ctx, &plan).await?;
93 let optimized_physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?;
94
95 let physical_plan = if let Some(wrapper) = self.plugins.get::<PhysicalPlanWrapperRef>() {
96 wrapper.wrap(optimized_physical_plan, query_ctx)
97 } else {
98 optimized_physical_plan
99 };
100
101 Ok(Output::new(
102 OutputData::Stream(self.execute_stream(&ctx, &physical_plan)?),
103 OutputMeta::new_with_plan(physical_plan),
104 ))
105 }
106
107 #[tracing::instrument(skip_all)]
108 async fn exec_dml_statement(
109 &self,
110 dml: DmlStatement,
111 query_ctx: QueryContextRef,
112 ) -> Result<Output> {
113 ensure!(
114 matches!(dml.op, WriteOp::Insert(_) | WriteOp::Delete),
115 UnsupportedExprSnafu {
116 name: format!("DML op {}", dml.op),
117 }
118 );
119
120 let _timer = QUERY_STAGE_ELAPSED
121 .with_label_values(&[dml.op.name()])
122 .start_timer();
123
124 let default_catalog = &query_ctx.current_catalog().to_owned();
125 let default_schema = &query_ctx.current_schema();
126 let table_name = dml.table_name.resolve(default_catalog, default_schema);
127 let table = self.find_table(&table_name, &query_ctx).await?;
128
129 let output = self
130 .exec_query_plan((*dml.input).clone(), query_ctx.clone())
131 .await?;
132 let mut stream = match output.data {
133 OutputData::RecordBatches(batches) => batches.as_stream(),
134 OutputData::Stream(stream) => stream,
135 _ => unreachable!(),
136 };
137
138 let mut affected_rows = 0;
139 let mut insert_cost = 0;
140
141 while let Some(batch) = stream.next().await {
142 let batch = batch.context(CreateRecordBatchSnafu)?;
143 let column_vectors = batch
144 .column_vectors(&table_name.to_string(), table.schema())
145 .map_err(BoxedError::new)
146 .context(QueryExecutionSnafu)?;
147
148 match dml.op {
149 WriteOp::Insert(_) => {
150 let output = self
152 .insert(&table_name, column_vectors, query_ctx.clone())
153 .await?;
154 let (rows, cost) = output.extract_rows_and_cost();
155 affected_rows += rows;
156 insert_cost += cost;
157 }
158 WriteOp::Delete => {
159 affected_rows += self
160 .delete(&table_name, &table, column_vectors, query_ctx.clone())
161 .await?;
162 }
163 _ => unreachable!("guarded by the 'ensure!' at the beginning"),
164 }
165 }
166 Ok(Output::new(
167 OutputData::AffectedRows(affected_rows),
168 OutputMeta::new_with_cost(insert_cost),
169 ))
170 }
171
172 #[tracing::instrument(skip_all)]
173 async fn delete(
174 &self,
175 table_name: &ResolvedTableReference,
176 table: &TableRef,
177 column_vectors: HashMap<String, VectorRef>,
178 query_ctx: QueryContextRef,
179 ) -> Result<usize> {
180 let catalog_name = table_name.catalog.to_string();
181 let schema_name = table_name.schema.to_string();
182 let table_name = table_name.table.to_string();
183 let table_schema = table.schema();
184
185 ensure!(
186 !is_readonly_schema(&schema_name),
187 TableReadOnlySnafu { table: table_name }
188 );
189
190 let ts_column = table_schema
191 .timestamp_column()
192 .map(|x| &x.name)
193 .with_context(|| MissingTimestampColumnSnafu {
194 table_name: table_name.to_string(),
195 })?;
196
197 let table_info = table.table_info();
198 let rowkey_columns = table_info
199 .meta
200 .row_key_column_names()
201 .collect::<Vec<&String>>();
202 let column_vectors = column_vectors
203 .into_iter()
204 .filter(|x| &x.0 == ts_column || rowkey_columns.contains(&&x.0))
205 .collect::<HashMap<_, _>>();
206
207 let request = DeleteRequest {
208 catalog_name,
209 schema_name,
210 table_name,
211 key_column_values: column_vectors,
212 };
213
214 self.state
215 .table_mutation_handler()
216 .context(MissingTableMutationHandlerSnafu)?
217 .delete(request, query_ctx)
218 .await
219 .context(TableMutationSnafu)
220 }
221
222 #[tracing::instrument(skip_all)]
223 async fn insert(
224 &self,
225 table_name: &ResolvedTableReference,
226 column_vectors: HashMap<String, VectorRef>,
227 query_ctx: QueryContextRef,
228 ) -> Result<Output> {
229 let catalog_name = table_name.catalog.to_string();
230 let schema_name = table_name.schema.to_string();
231 let table_name = table_name.table.to_string();
232
233 ensure!(
234 !is_readonly_schema(&schema_name),
235 TableReadOnlySnafu { table: table_name }
236 );
237
238 let request = InsertRequest {
239 catalog_name,
240 schema_name,
241 table_name,
242 columns_values: column_vectors,
243 };
244
245 self.state
246 .table_mutation_handler()
247 .context(MissingTableMutationHandlerSnafu)?
248 .insert(request, query_ctx)
249 .await
250 .context(TableMutationSnafu)
251 }
252
253 async fn find_table(
254 &self,
255 table_name: &ResolvedTableReference,
256 query_context: &QueryContextRef,
257 ) -> Result<TableRef> {
258 let catalog_name = table_name.catalog.as_ref();
259 let schema_name = table_name.schema.as_ref();
260 let table_name = table_name.table.as_ref();
261
262 self.state
263 .catalog_manager()
264 .table(catalog_name, schema_name, table_name, Some(query_context))
265 .await
266 .context(CatalogSnafu)?
267 .with_context(|| TableNotFoundSnafu { table: table_name })
268 }
269
270 #[tracing::instrument(skip_all)]
271 async fn create_physical_plan(
272 &self,
273 ctx: &mut QueryEngineContext,
274 logical_plan: &LogicalPlan,
275 ) -> Result<Arc<dyn ExecutionPlan>> {
276 #[derive(Debug)]
280 struct PanicLogger<'a> {
281 input_logical_plan: &'a LogicalPlan,
282 after_analyze: Option<LogicalPlan>,
283 after_optimize: Option<LogicalPlan>,
284 phy_plan: Option<Arc<dyn ExecutionPlan>>,
285 }
286 impl Drop for PanicLogger<'_> {
287 fn drop(&mut self) {
288 if std::thread::panicking() {
289 common_telemetry::error!(
290 "Panic while creating physical plan, input logical plan: {:?}, after analyze: {:?}, after optimize: {:?}, final physical plan: {:?}",
291 self.input_logical_plan,
292 self.after_analyze,
293 self.after_optimize,
294 self.phy_plan
295 );
296 }
297 }
298 }
299
300 let mut logger = PanicLogger {
301 input_logical_plan: logical_plan,
302 after_analyze: None,
303 after_optimize: None,
304 phy_plan: None,
305 };
306
307 let _timer = metrics::CREATE_PHYSICAL_ELAPSED.start_timer();
308 let state = ctx.state();
309
310 common_telemetry::debug!("Create physical plan, input plan: {logical_plan}");
311
312 if matches!(logical_plan, DfLogicalPlan::Explain(_)) {
314 return state
315 .create_physical_plan(logical_plan)
316 .await
317 .context(error::DatafusionSnafu)
318 .map_err(BoxedError::new)
319 .context(QueryExecutionSnafu);
320 }
321
322 let analyzed_plan = state
324 .analyzer()
325 .execute_and_check(logical_plan.clone(), state.config_options(), |_, _| {})
326 .context(error::DatafusionSnafu)
327 .map_err(BoxedError::new)
328 .context(QueryExecutionSnafu)?;
329
330 logger.after_analyze = Some(analyzed_plan.clone());
331
332 common_telemetry::debug!("Create physical plan, analyzed plan: {analyzed_plan}");
333
334 let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan
336 && ext.node.name() == MergeScanLogicalPlan::name()
337 {
338 analyzed_plan.clone()
339 } else {
340 state
341 .optimizer()
342 .optimize(analyzed_plan, state, |_, _| {})
343 .context(error::DatafusionSnafu)
344 .map_err(BoxedError::new)
345 .context(QueryExecutionSnafu)?
346 };
347
348 common_telemetry::debug!("Create physical plan, optimized plan: {optimized_plan}");
349 logger.after_optimize = Some(optimized_plan.clone());
350
351 let physical_plan = state
352 .query_planner()
353 .create_physical_plan(&optimized_plan, state)
354 .await?;
355
356 logger.phy_plan = Some(physical_plan.clone());
357 drop(logger);
358 Ok(physical_plan)
359 }
360
361 #[tracing::instrument(skip_all)]
362 pub fn optimize(
363 &self,
364 context: &QueryEngineContext,
365 plan: &LogicalPlan,
366 ) -> Result<LogicalPlan> {
367 let _timer = metrics::OPTIMIZE_LOGICAL_ELAPSED.start_timer();
368
369 let optimized_plan = self
371 .state
372 .optimize_by_extension_rules(plan.clone(), context)
373 .context(error::DatafusionSnafu)
374 .map_err(BoxedError::new)
375 .context(QueryExecutionSnafu)?;
376
377 let optimized_plan = self
379 .state
380 .session_state()
381 .optimize(&optimized_plan)
382 .context(error::DatafusionSnafu)
383 .map_err(BoxedError::new)
384 .context(QueryExecutionSnafu)?;
385
386 Ok(optimized_plan)
387 }
388
389 #[tracing::instrument(skip_all)]
390 fn optimize_physical_plan(
391 &self,
392 ctx: &mut QueryEngineContext,
393 plan: Arc<dyn ExecutionPlan>,
394 ) -> Result<Arc<dyn ExecutionPlan>> {
395 let _timer = metrics::OPTIMIZE_PHYSICAL_ELAPSED.start_timer();
396
397 let optimized_plan = if let Some(analyze_plan) = plan.as_any().downcast_ref::<AnalyzeExec>()
404 {
405 let format = if let Some(format) = ctx.query_ctx().explain_format()
406 && format.to_lowercase() == "json"
407 {
408 AnalyzeFormat::JSON
409 } else {
410 AnalyzeFormat::TEXT
411 };
412 ctx.query_ctx().set_explain_verbose(analyze_plan.verbose());
415
416 Arc::new(DistAnalyzeExec::new(
417 analyze_plan.input().clone(),
418 analyze_plan.verbose(),
419 format,
420 ))
421 } else {
429 plan
430 };
438
439 Ok(optimized_plan)
440 }
441}
442
443#[async_trait]
444impl QueryEngine for DatafusionQueryEngine {
445 fn as_any(&self) -> &dyn Any {
446 self
447 }
448
449 fn planner(&self) -> Arc<dyn LogicalPlanner> {
450 Arc::new(DfLogicalPlanner::new(self.state.clone()))
451 }
452
453 fn name(&self) -> &str {
454 "datafusion"
455 }
456
457 async fn describe(
458 &self,
459 plan: LogicalPlan,
460 query_ctx: QueryContextRef,
461 ) -> Result<DescribeResult> {
462 let ctx = self.engine_context(query_ctx);
463 if let Ok(optimised_plan) = self.optimize(&ctx, &plan) {
464 let schema = optimised_plan
465 .schema()
466 .clone()
467 .try_into()
468 .context(ConvertSchemaSnafu)?;
469 Ok(DescribeResult {
470 schema,
471 logical_plan: optimised_plan,
472 })
473 } else {
474 let schema = plan
477 .schema()
478 .clone()
479 .try_into()
480 .context(ConvertSchemaSnafu)?;
481 Ok(DescribeResult {
482 schema,
483 logical_plan: plan,
484 })
485 }
486 }
487
488 async fn execute(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
489 match plan {
490 LogicalPlan::Dml(dml) => self.exec_dml_statement(dml, query_ctx).await,
491 _ => self.exec_query_plan(plan, query_ctx).await,
492 }
493 }
494
495 fn register_aggregate_function(&self, func: AggregateUDF) {
503 self.state.register_aggr_function(func);
504 }
505
506 fn register_scalar_function(&self, func: ScalarFunctionFactory) {
509 self.state.register_scalar_function(func);
510 }
511
512 fn register_table_function(&self, func: Arc<TableFunction>) {
513 self.state.register_table_function(func);
514 }
515
516 fn read_table(&self, table: TableRef) -> Result<DataFrame> {
517 Ok(DataFrame::DataFusion(
518 self.state
519 .read_table(table)
520 .context(error::DatafusionSnafu)
521 .map_err(BoxedError::new)
522 .context(QueryExecutionSnafu)?,
523 ))
524 }
525
526 fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext {
527 let mut state = self.state.session_state();
528 state.config_mut().set_extension(query_ctx.clone());
529 if let Some(parallelism) = query_ctx.extension(QUERY_PARALLELISM_HINT) {
532 if let Ok(n) = parallelism.parse::<u64>() {
533 if n > 0 {
534 let new_cfg = state.config().clone().with_target_partitions(n as usize);
535 *state.config_mut() = new_cfg;
536 }
537 } else {
538 common_telemetry::warn!(
539 "Failed to parse query_parallelism: {}, using default value",
540 parallelism
541 );
542 }
543 }
544
545 state.config_mut().options_mut().execution.time_zone = query_ctx.timezone().to_string();
547
548 if query_ctx.configuration_parameter().allow_query_fallback() {
551 state
552 .config_mut()
553 .options_mut()
554 .extensions
555 .insert(DistPlannerOptions {
556 allow_query_fallback: true,
557 });
558 } else if let Some(fallback) = query_ctx.extension(QUERY_FALLBACK_HINT) {
559 if fallback.to_lowercase().parse::<bool>().unwrap_or(false) {
562 state
563 .config_mut()
564 .options_mut()
565 .extensions
566 .insert(DistPlannerOptions {
567 allow_query_fallback: true,
568 });
569 }
570 }
571 QueryEngineContext::new(state, query_ctx)
572 }
573
574 fn engine_state(&self) -> &QueryEngineState {
575 &self.state
576 }
577}
578
579impl QueryExecutor for DatafusionQueryEngine {
580 #[tracing::instrument(skip_all)]
581 fn execute_stream(
582 &self,
583 ctx: &QueryEngineContext,
584 plan: &Arc<dyn ExecutionPlan>,
585 ) -> Result<SendableRecordBatchStream> {
586 let explain_verbose = ctx.query_ctx().explain_verbose();
587 let output_partitions = plan.properties().output_partitioning().partition_count();
588 if explain_verbose {
589 common_telemetry::info!("Executing query plan, output_partitions: {output_partitions}");
590 }
591
592 let exec_timer = metrics::EXEC_PLAN_ELAPSED.start_timer();
593 let task_ctx = ctx.build_task_ctx();
594
595 match plan.properties().output_partitioning().partition_count() {
596 0 => {
597 let schema = Arc::new(
598 Schema::try_from(plan.schema())
599 .map_err(BoxedError::new)
600 .context(QueryExecutionSnafu)?,
601 );
602 Ok(Box::pin(EmptyRecordBatchStream::new(schema)))
603 }
604 1 => {
605 let df_stream = plan
606 .execute(0, task_ctx)
607 .context(error::DatafusionSnafu)
608 .map_err(BoxedError::new)
609 .context(QueryExecutionSnafu)?;
610 let mut stream = RecordBatchStreamAdapter::try_new(df_stream)
611 .context(error::ConvertDfRecordBatchStreamSnafu)
612 .map_err(BoxedError::new)
613 .context(QueryExecutionSnafu)?;
614 stream.set_metrics2(plan.clone());
615 stream.set_explain_verbose(explain_verbose);
616 let stream = OnDone::new(Box::pin(stream), move || {
617 let exec_cost = exec_timer.stop_and_record();
618 if explain_verbose {
619 common_telemetry::info!(
620 "DatafusionQueryEngine execute 1 stream, cost: {:?}s",
621 exec_cost,
622 );
623 }
624 });
625 Ok(Box::pin(stream))
626 }
627 _ => {
628 let merged_plan = CoalescePartitionsExec::new(plan.clone());
630 assert_eq!(
632 1,
633 merged_plan
634 .properties()
635 .output_partitioning()
636 .partition_count()
637 );
638 let df_stream = merged_plan
639 .execute(0, task_ctx)
640 .context(error::DatafusionSnafu)
641 .map_err(BoxedError::new)
642 .context(QueryExecutionSnafu)?;
643 let mut stream = RecordBatchStreamAdapter::try_new(df_stream)
644 .context(error::ConvertDfRecordBatchStreamSnafu)
645 .map_err(BoxedError::new)
646 .context(QueryExecutionSnafu)?;
647 stream.set_metrics2(plan.clone());
648 stream.set_explain_verbose(ctx.query_ctx().explain_verbose());
649 let stream = OnDone::new(Box::pin(stream), move || {
650 let exec_cost = exec_timer.stop_and_record();
651 if explain_verbose {
652 common_telemetry::info!(
653 "DatafusionQueryEngine execute {output_partitions} stream, cost: {:?}s",
654 exec_cost
655 );
656 }
657 });
658 Ok(Box::pin(stream))
659 }
660 }
661 }
662}
663
664#[cfg(test)]
665mod tests {
666 use std::sync::Arc;
667
668 use catalog::RegisterTableRequest;
669 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID};
670 use common_recordbatch::util;
671 use datafusion::prelude::{col, lit};
672 use datatypes::prelude::ConcreteDataType;
673 use datatypes::schema::ColumnSchema;
674 use datatypes::vectors::{Helper, UInt32Vector, UInt64Vector, VectorRef};
675 use session::context::{QueryContext, QueryContextBuilder};
676 use table::table::numbers::{NUMBERS_TABLE_NAME, NumbersTable};
677
678 use super::*;
679 use crate::options::QueryOptions;
680 use crate::parser::QueryLanguageParser;
681 use crate::query_engine::{QueryEngineFactory, QueryEngineRef};
682
683 async fn create_test_engine() -> QueryEngineRef {
684 let catalog_manager = catalog::memory::new_memory_catalog_manager().unwrap();
685 let req = RegisterTableRequest {
686 catalog: DEFAULT_CATALOG_NAME.to_string(),
687 schema: DEFAULT_SCHEMA_NAME.to_string(),
688 table_name: NUMBERS_TABLE_NAME.to_string(),
689 table_id: NUMBERS_TABLE_ID,
690 table: NumbersTable::table(NUMBERS_TABLE_ID),
691 };
692 catalog_manager.register_table_sync(req).unwrap();
693
694 QueryEngineFactory::new(
695 catalog_manager,
696 None,
697 None,
698 None,
699 None,
700 false,
701 QueryOptions::default(),
702 )
703 .query_engine()
704 }
705
706 #[tokio::test]
707 async fn test_sql_to_plan() {
708 let engine = create_test_engine().await;
709 let sql = "select sum(number) from numbers limit 20";
710
711 let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
712 let plan = engine
713 .planner()
714 .plan(&stmt, QueryContext::arc())
715 .await
716 .unwrap();
717
718 assert_eq!(
719 plan.to_string(),
720 r#"Limit: skip=0, fetch=20
721 Projection: sum(numbers.number)
722 Aggregate: groupBy=[[]], aggr=[[sum(numbers.number)]]
723 TableScan: numbers"#
724 );
725 }
726
727 #[tokio::test]
728 async fn test_execute() {
729 let engine = create_test_engine().await;
730 let sql = "select sum(number) from numbers limit 20";
731
732 let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
733 let plan = engine
734 .planner()
735 .plan(&stmt, QueryContext::arc())
736 .await
737 .unwrap();
738
739 let output = engine.execute(plan, QueryContext::arc()).await.unwrap();
740
741 match output.data {
742 OutputData::Stream(recordbatch) => {
743 let numbers = util::collect(recordbatch).await.unwrap();
744 assert_eq!(1, numbers.len());
745 assert_eq!(numbers[0].num_columns(), 1);
746 assert_eq!(1, numbers[0].schema.num_columns());
747 assert_eq!(
748 "sum(numbers.number)",
749 numbers[0].schema.column_schemas()[0].name
750 );
751
752 let batch = &numbers[0];
753 assert_eq!(1, batch.num_columns());
754 assert_eq!(batch.column(0).len(), 1);
755
756 assert_eq!(
757 *batch.column(0),
758 Arc::new(UInt64Vector::from_slice([4950])) as VectorRef
759 );
760 }
761 _ => unreachable!(),
762 }
763 }
764
765 #[tokio::test]
766 async fn test_read_table() {
767 let engine = create_test_engine().await;
768
769 let engine = engine
770 .as_any()
771 .downcast_ref::<DatafusionQueryEngine>()
772 .unwrap();
773 let query_ctx = Arc::new(QueryContextBuilder::default().build());
774 let table = engine
775 .find_table(
776 &ResolvedTableReference {
777 catalog: "greptime".into(),
778 schema: "public".into(),
779 table: "numbers".into(),
780 },
781 &query_ctx,
782 )
783 .await
784 .unwrap();
785
786 let DataFrame::DataFusion(df) = engine.read_table(table).unwrap();
787 let df = df
788 .select_columns(&["number"])
789 .unwrap()
790 .filter(col("number").lt(lit(10)))
791 .unwrap();
792 let batches = df.collect().await.unwrap();
793 assert_eq!(1, batches.len());
794 let batch = &batches[0];
795
796 assert_eq!(1, batch.num_columns());
797 assert_eq!(batch.column(0).len(), 10);
798
799 assert_eq!(
800 Helper::try_into_vector(batch.column(0)).unwrap(),
801 Arc::new(UInt32Vector::from_slice([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])) as VectorRef
802 );
803 }
804
805 #[tokio::test]
806 async fn test_describe() {
807 let engine = create_test_engine().await;
808 let sql = "select sum(number) from numbers limit 20";
809
810 let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
811
812 let plan = engine
813 .planner()
814 .plan(&stmt, QueryContext::arc())
815 .await
816 .unwrap();
817
818 let DescribeResult {
819 schema,
820 logical_plan,
821 } = engine.describe(plan, QueryContext::arc()).await.unwrap();
822
823 assert_eq!(
824 schema.column_schemas()[0],
825 ColumnSchema::new(
826 "sum(numbers.number)",
827 ConcreteDataType::uint64_datatype(),
828 true
829 )
830 );
831 assert_eq!(
832 "Limit: skip=0, fetch=20\n Aggregate: groupBy=[[]], aggr=[[sum(CAST(numbers.number AS UInt64))]]\n TableScan: numbers projection=[number]",
833 format!("{}", logical_plan.display_indent())
834 );
835 }
836}