1mod error;
18mod planner;
19
20use std::any::Any;
21use std::collections::HashMap;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use common_base::Plugins;
26use common_catalog::consts::is_readonly_schema;
27use common_error::ext::BoxedError;
28use common_function::function::FunctionContext;
29use common_function::function_factory::ScalarFunctionFactory;
30use common_query::{Output, OutputData, OutputMeta};
31use common_recordbatch::adapter::RecordBatchStreamAdapter;
32use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
33use common_telemetry::tracing;
34use datafusion::catalog::TableFunction;
35use datafusion::dataframe::DataFrame;
36use datafusion::physical_plan::ExecutionPlan;
37use datafusion::physical_plan::analyze::AnalyzeExec;
38use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
39use datafusion_common::ResolvedTableReference;
40use datafusion_expr::{
41 AggregateUDF, DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlan, WriteOp,
42};
43use datatypes::prelude::VectorRef;
44use datatypes::schema::Schema;
45use futures_util::StreamExt;
46use session::context::QueryContextRef;
47use snafu::{OptionExt, ResultExt, ensure};
48use sqlparser::ast::AnalyzeFormat;
49use table::TableRef;
50use table::requests::{DeleteRequest, InsertRequest};
51
52use crate::analyze::DistAnalyzeExec;
53pub use crate::datafusion::planner::DfContextProviderAdapter;
54use crate::dist_plan::{DistPlannerOptions, MergeScanLogicalPlan};
55use crate::error::{
56 CatalogSnafu, ConvertSchemaSnafu, CreateRecordBatchSnafu, MissingTableMutationHandlerSnafu,
57 MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, TableMutationSnafu,
58 TableNotFoundSnafu, TableReadOnlySnafu, UnsupportedExprSnafu,
59};
60use crate::executor::QueryExecutor;
61use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED};
62use crate::physical_wrapper::PhysicalPlanWrapperRef;
63use crate::planner::{DfLogicalPlanner, LogicalPlanner};
64use crate::query_engine::{DescribeResult, QueryEngineContext, QueryEngineState};
65use crate::{QueryEngine, metrics};
66
67pub const QUERY_PARALLELISM_HINT: &str = "query_parallelism";
70
71pub const QUERY_FALLBACK_HINT: &str = "query_fallback";
73
74pub struct DatafusionQueryEngine {
75 state: Arc<QueryEngineState>,
76 plugins: Plugins,
77}
78
79impl DatafusionQueryEngine {
80 pub fn new(state: Arc<QueryEngineState>, plugins: Plugins) -> Self {
81 Self { state, plugins }
82 }
83
84 #[tracing::instrument(skip_all)]
85 async fn exec_query_plan(
86 &self,
87 plan: LogicalPlan,
88 query_ctx: QueryContextRef,
89 ) -> Result<Output> {
90 let mut ctx = self.engine_context(query_ctx.clone());
91
92 let physical_plan = self.create_physical_plan(&mut ctx, &plan).await?;
94 let optimized_physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?;
95
96 let physical_plan = if let Some(wrapper) = self.plugins.get::<PhysicalPlanWrapperRef>() {
97 wrapper.wrap(optimized_physical_plan, query_ctx)
98 } else {
99 optimized_physical_plan
100 };
101
102 Ok(Output::new(
103 OutputData::Stream(self.execute_stream(&ctx, &physical_plan)?),
104 OutputMeta::new_with_plan(physical_plan),
105 ))
106 }
107
108 #[tracing::instrument(skip_all)]
109 async fn exec_dml_statement(
110 &self,
111 dml: DmlStatement,
112 query_ctx: QueryContextRef,
113 ) -> Result<Output> {
114 ensure!(
115 matches!(dml.op, WriteOp::Insert(_) | WriteOp::Delete),
116 UnsupportedExprSnafu {
117 name: format!("DML op {}", dml.op),
118 }
119 );
120
121 let _timer = QUERY_STAGE_ELAPSED
122 .with_label_values(&[dml.op.name()])
123 .start_timer();
124
125 let default_catalog = &query_ctx.current_catalog().to_owned();
126 let default_schema = &query_ctx.current_schema();
127 let table_name = dml.table_name.resolve(default_catalog, default_schema);
128 let table = self.find_table(&table_name, &query_ctx).await?;
129
130 let output = self
131 .exec_query_plan((*dml.input).clone(), query_ctx.clone())
132 .await?;
133 let mut stream = match output.data {
134 OutputData::RecordBatches(batches) => batches.as_stream(),
135 OutputData::Stream(stream) => stream,
136 _ => unreachable!(),
137 };
138
139 let mut affected_rows = 0;
140 let mut insert_cost = 0;
141
142 while let Some(batch) = stream.next().await {
143 let batch = batch.context(CreateRecordBatchSnafu)?;
144 let column_vectors = batch
145 .column_vectors(&table_name.to_string(), table.schema())
146 .map_err(BoxedError::new)
147 .context(QueryExecutionSnafu)?;
148
149 match dml.op {
150 WriteOp::Insert(_) => {
151 let output = self
153 .insert(&table_name, column_vectors, query_ctx.clone())
154 .await?;
155 let (rows, cost) = output.extract_rows_and_cost();
156 affected_rows += rows;
157 insert_cost += cost;
158 }
159 WriteOp::Delete => {
160 affected_rows += self
161 .delete(&table_name, &table, column_vectors, query_ctx.clone())
162 .await?;
163 }
164 _ => unreachable!("guarded by the 'ensure!' at the beginning"),
165 }
166 }
167 Ok(Output::new(
168 OutputData::AffectedRows(affected_rows),
169 OutputMeta::new_with_cost(insert_cost),
170 ))
171 }
172
173 #[tracing::instrument(skip_all)]
174 async fn delete(
175 &self,
176 table_name: &ResolvedTableReference,
177 table: &TableRef,
178 column_vectors: HashMap<String, VectorRef>,
179 query_ctx: QueryContextRef,
180 ) -> Result<usize> {
181 let catalog_name = table_name.catalog.to_string();
182 let schema_name = table_name.schema.to_string();
183 let table_name = table_name.table.to_string();
184 let table_schema = table.schema();
185
186 ensure!(
187 !is_readonly_schema(&schema_name),
188 TableReadOnlySnafu { table: table_name }
189 );
190
191 let ts_column = table_schema
192 .timestamp_column()
193 .map(|x| &x.name)
194 .with_context(|| MissingTimestampColumnSnafu {
195 table_name: table_name.clone(),
196 })?;
197
198 let table_info = table.table_info();
199 let rowkey_columns = table_info
200 .meta
201 .row_key_column_names()
202 .collect::<Vec<&String>>();
203 let column_vectors = column_vectors
204 .into_iter()
205 .filter(|x| &x.0 == ts_column || rowkey_columns.contains(&&x.0))
206 .collect::<HashMap<_, _>>();
207
208 let request = DeleteRequest {
209 catalog_name,
210 schema_name,
211 table_name,
212 key_column_values: column_vectors,
213 };
214
215 self.state
216 .table_mutation_handler()
217 .context(MissingTableMutationHandlerSnafu)?
218 .delete(request, query_ctx)
219 .await
220 .context(TableMutationSnafu)
221 }
222
223 #[tracing::instrument(skip_all)]
224 async fn insert(
225 &self,
226 table_name: &ResolvedTableReference,
227 column_vectors: HashMap<String, VectorRef>,
228 query_ctx: QueryContextRef,
229 ) -> Result<Output> {
230 let catalog_name = table_name.catalog.to_string();
231 let schema_name = table_name.schema.to_string();
232 let table_name = table_name.table.to_string();
233
234 ensure!(
235 !is_readonly_schema(&schema_name),
236 TableReadOnlySnafu { table: table_name }
237 );
238
239 let request = InsertRequest {
240 catalog_name,
241 schema_name,
242 table_name,
243 columns_values: column_vectors,
244 };
245
246 self.state
247 .table_mutation_handler()
248 .context(MissingTableMutationHandlerSnafu)?
249 .insert(request, query_ctx)
250 .await
251 .context(TableMutationSnafu)
252 }
253
254 async fn find_table(
255 &self,
256 table_name: &ResolvedTableReference,
257 query_context: &QueryContextRef,
258 ) -> Result<TableRef> {
259 let catalog_name = table_name.catalog.as_ref();
260 let schema_name = table_name.schema.as_ref();
261 let table_name = table_name.table.as_ref();
262
263 self.state
264 .catalog_manager()
265 .table(catalog_name, schema_name, table_name, Some(query_context))
266 .await
267 .context(CatalogSnafu)?
268 .with_context(|| TableNotFoundSnafu { table: table_name })
269 }
270
271 #[tracing::instrument(skip_all)]
272 async fn create_physical_plan(
273 &self,
274 ctx: &mut QueryEngineContext,
275 logical_plan: &LogicalPlan,
276 ) -> Result<Arc<dyn ExecutionPlan>> {
277 #[derive(Debug)]
281 struct PanicLogger<'a> {
282 input_logical_plan: &'a LogicalPlan,
283 after_analyze: Option<LogicalPlan>,
284 after_optimize: Option<LogicalPlan>,
285 phy_plan: Option<Arc<dyn ExecutionPlan>>,
286 }
287 impl Drop for PanicLogger<'_> {
288 fn drop(&mut self) {
289 if std::thread::panicking() {
290 common_telemetry::error!(
291 "Panic while creating physical plan, input logical plan: {:?}, after analyze: {:?}, after optimize: {:?}, final physical plan: {:?}",
292 self.input_logical_plan,
293 self.after_analyze,
294 self.after_optimize,
295 self.phy_plan
296 );
297 }
298 }
299 }
300
301 let mut logger = PanicLogger {
302 input_logical_plan: logical_plan,
303 after_analyze: None,
304 after_optimize: None,
305 phy_plan: None,
306 };
307
308 let _timer = metrics::CREATE_PHYSICAL_ELAPSED.start_timer();
309 let state = ctx.state();
310
311 common_telemetry::debug!("Create physical plan, input plan: {logical_plan}");
312
313 if matches!(logical_plan, DfLogicalPlan::Explain(_)) {
315 return state
316 .create_physical_plan(logical_plan)
317 .await
318 .context(error::DatafusionSnafu)
319 .map_err(BoxedError::new)
320 .context(QueryExecutionSnafu);
321 }
322
323 let analyzed_plan = state
325 .analyzer()
326 .execute_and_check(logical_plan.clone(), state.config_options(), |_, _| {})
327 .context(error::DatafusionSnafu)
328 .map_err(BoxedError::new)
329 .context(QueryExecutionSnafu)?;
330
331 logger.after_analyze = Some(analyzed_plan.clone());
332
333 common_telemetry::debug!("Create physical plan, analyzed plan: {analyzed_plan}");
334
335 let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan
337 && ext.node.name() == MergeScanLogicalPlan::name()
338 {
339 analyzed_plan.clone()
340 } else {
341 state
342 .optimizer()
343 .optimize(analyzed_plan, state, |_, _| {})
344 .context(error::DatafusionSnafu)
345 .map_err(BoxedError::new)
346 .context(QueryExecutionSnafu)?
347 };
348
349 common_telemetry::debug!("Create physical plan, optimized plan: {optimized_plan}");
350 logger.after_optimize = Some(optimized_plan.clone());
351
352 let physical_plan = state
353 .query_planner()
354 .create_physical_plan(&optimized_plan, state)
355 .await?;
356
357 logger.phy_plan = Some(physical_plan.clone());
358 drop(logger);
359 Ok(physical_plan)
360 }
361
362 #[tracing::instrument(skip_all)]
363 pub fn optimize(
364 &self,
365 context: &QueryEngineContext,
366 plan: &LogicalPlan,
367 ) -> Result<LogicalPlan> {
368 let _timer = metrics::OPTIMIZE_LOGICAL_ELAPSED.start_timer();
369
370 let optimized_plan = self
372 .state
373 .optimize_by_extension_rules(plan.clone(), context)
374 .context(error::DatafusionSnafu)
375 .map_err(BoxedError::new)
376 .context(QueryExecutionSnafu)?;
377
378 let optimized_plan = self
380 .state
381 .session_state()
382 .optimize(&optimized_plan)
383 .context(error::DatafusionSnafu)
384 .map_err(BoxedError::new)
385 .context(QueryExecutionSnafu)?;
386
387 Ok(optimized_plan)
388 }
389
390 #[tracing::instrument(skip_all)]
391 fn optimize_physical_plan(
392 &self,
393 ctx: &mut QueryEngineContext,
394 plan: Arc<dyn ExecutionPlan>,
395 ) -> Result<Arc<dyn ExecutionPlan>> {
396 let _timer = metrics::OPTIMIZE_PHYSICAL_ELAPSED.start_timer();
397
398 let optimized_plan = if let Some(analyze_plan) = plan.as_any().downcast_ref::<AnalyzeExec>()
405 {
406 let format = if let Some(format) = ctx.query_ctx().explain_format()
407 && format.to_lowercase() == "json"
408 {
409 AnalyzeFormat::JSON
410 } else {
411 AnalyzeFormat::TEXT
412 };
413 ctx.query_ctx().set_explain_verbose(analyze_plan.verbose());
416
417 Arc::new(DistAnalyzeExec::new(
418 analyze_plan.input().clone(),
419 analyze_plan.verbose(),
420 format,
421 ))
422 } else {
430 plan
431 };
439
440 Ok(optimized_plan)
441 }
442}
443
444#[async_trait]
445impl QueryEngine for DatafusionQueryEngine {
446 fn as_any(&self) -> &dyn Any {
447 self
448 }
449
450 fn planner(&self) -> Arc<dyn LogicalPlanner> {
451 Arc::new(DfLogicalPlanner::new(self.state.clone()))
452 }
453
454 fn name(&self) -> &str {
455 "datafusion"
456 }
457
458 async fn describe(
459 &self,
460 plan: LogicalPlan,
461 query_ctx: QueryContextRef,
462 ) -> Result<DescribeResult> {
463 let ctx = self.engine_context(query_ctx);
464 if let Ok(optimised_plan) = self.optimize(&ctx, &plan) {
465 let schema = optimised_plan
466 .schema()
467 .clone()
468 .try_into()
469 .context(ConvertSchemaSnafu)?;
470 Ok(DescribeResult {
471 schema,
472 logical_plan: optimised_plan,
473 })
474 } else {
475 let schema = plan
478 .schema()
479 .clone()
480 .try_into()
481 .context(ConvertSchemaSnafu)?;
482 Ok(DescribeResult {
483 schema,
484 logical_plan: plan,
485 })
486 }
487 }
488
489 async fn execute(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
490 match plan {
491 LogicalPlan::Dml(dml) => self.exec_dml_statement(dml, query_ctx).await,
492 _ => self.exec_query_plan(plan, query_ctx).await,
493 }
494 }
495
496 fn register_aggregate_function(&self, func: AggregateUDF) {
504 self.state.register_aggr_function(func);
505 }
506
507 fn register_scalar_function(&self, func: ScalarFunctionFactory) {
510 self.state.register_scalar_function(func);
511 }
512
513 fn register_table_function(&self, func: Arc<TableFunction>) {
514 self.state.register_table_function(func);
515 }
516
517 fn read_table(&self, table: TableRef) -> Result<DataFrame> {
518 self.state
519 .read_table(table)
520 .context(error::DatafusionSnafu)
521 .map_err(BoxedError::new)
522 .context(QueryExecutionSnafu)
523 }
524
525 fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext {
526 let mut state = self.state.session_state();
527 state.config_mut().set_extension(query_ctx.clone());
528 if let Some(parallelism) = query_ctx.extension(QUERY_PARALLELISM_HINT) {
531 if let Ok(n) = parallelism.parse::<u64>() {
532 if n > 0 {
533 let new_cfg = state.config().clone().with_target_partitions(n as usize);
534 *state.config_mut() = new_cfg;
535 }
536 } else {
537 common_telemetry::warn!(
538 "Failed to parse query_parallelism: {}, using default value",
539 parallelism
540 );
541 }
542 }
543
544 state.config_mut().options_mut().execution.time_zone = query_ctx.timezone().to_string();
546
547 if query_ctx.configuration_parameter().allow_query_fallback() {
550 state
551 .config_mut()
552 .options_mut()
553 .extensions
554 .insert(DistPlannerOptions {
555 allow_query_fallback: true,
556 });
557 } else if let Some(fallback) = query_ctx.extension(QUERY_FALLBACK_HINT) {
558 if fallback.to_lowercase().parse::<bool>().unwrap_or(false) {
561 state
562 .config_mut()
563 .options_mut()
564 .extensions
565 .insert(DistPlannerOptions {
566 allow_query_fallback: true,
567 });
568 }
569 }
570
571 state
572 .config_mut()
573 .options_mut()
574 .extensions
575 .insert(FunctionContext {
576 query_ctx: query_ctx.clone(),
577 state: self.engine_state().function_state(),
578 });
579
580 let config_options = state.config_options().clone();
581 let _ = state
582 .execution_props_mut()
583 .config_options
584 .insert(config_options);
585
586 QueryEngineContext::new(state, query_ctx)
587 }
588
589 fn engine_state(&self) -> &QueryEngineState {
590 &self.state
591 }
592}
593
594impl QueryExecutor for DatafusionQueryEngine {
595 #[tracing::instrument(skip_all)]
596 fn execute_stream(
597 &self,
598 ctx: &QueryEngineContext,
599 plan: &Arc<dyn ExecutionPlan>,
600 ) -> Result<SendableRecordBatchStream> {
601 let explain_verbose = ctx.query_ctx().explain_verbose();
602 let output_partitions = plan.properties().output_partitioning().partition_count();
603 if explain_verbose {
604 common_telemetry::info!("Executing query plan, output_partitions: {output_partitions}");
605 }
606
607 let exec_timer = metrics::EXEC_PLAN_ELAPSED.start_timer();
608 let task_ctx = ctx.build_task_ctx();
609
610 match plan.properties().output_partitioning().partition_count() {
611 0 => {
612 let schema = Arc::new(
613 Schema::try_from(plan.schema())
614 .map_err(BoxedError::new)
615 .context(QueryExecutionSnafu)?,
616 );
617 Ok(Box::pin(EmptyRecordBatchStream::new(schema)))
618 }
619 1 => {
620 let df_stream = plan
621 .execute(0, task_ctx)
622 .context(error::DatafusionSnafu)
623 .map_err(BoxedError::new)
624 .context(QueryExecutionSnafu)?;
625 let mut stream = RecordBatchStreamAdapter::try_new(df_stream)
626 .context(error::ConvertDfRecordBatchStreamSnafu)
627 .map_err(BoxedError::new)
628 .context(QueryExecutionSnafu)?;
629 stream.set_metrics2(plan.clone());
630 stream.set_explain_verbose(explain_verbose);
631 let stream = OnDone::new(Box::pin(stream), move || {
632 let exec_cost = exec_timer.stop_and_record();
633 if explain_verbose {
634 common_telemetry::info!(
635 "DatafusionQueryEngine execute 1 stream, cost: {:?}s",
636 exec_cost,
637 );
638 }
639 });
640 Ok(Box::pin(stream))
641 }
642 _ => {
643 let merged_plan = CoalescePartitionsExec::new(plan.clone());
645 assert_eq!(
647 1,
648 merged_plan
649 .properties()
650 .output_partitioning()
651 .partition_count()
652 );
653 let df_stream = merged_plan
654 .execute(0, task_ctx)
655 .context(error::DatafusionSnafu)
656 .map_err(BoxedError::new)
657 .context(QueryExecutionSnafu)?;
658 let mut stream = RecordBatchStreamAdapter::try_new(df_stream)
659 .context(error::ConvertDfRecordBatchStreamSnafu)
660 .map_err(BoxedError::new)
661 .context(QueryExecutionSnafu)?;
662 stream.set_metrics2(plan.clone());
663 stream.set_explain_verbose(ctx.query_ctx().explain_verbose());
664 let stream = OnDone::new(Box::pin(stream), move || {
665 let exec_cost = exec_timer.stop_and_record();
666 if explain_verbose {
667 common_telemetry::info!(
668 "DatafusionQueryEngine execute {output_partitions} stream, cost: {:?}s",
669 exec_cost
670 );
671 }
672 });
673 Ok(Box::pin(stream))
674 }
675 }
676 }
677}
678
679#[cfg(test)]
680mod tests {
681 use std::sync::Arc;
682
683 use arrow::array::{ArrayRef, UInt64Array};
684 use catalog::RegisterTableRequest;
685 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID};
686 use common_recordbatch::util;
687 use datafusion::prelude::{col, lit};
688 use datatypes::prelude::ConcreteDataType;
689 use datatypes::schema::ColumnSchema;
690 use datatypes::vectors::{Helper, UInt32Vector, VectorRef};
691 use session::context::{QueryContext, QueryContextBuilder};
692 use table::table::numbers::{NUMBERS_TABLE_NAME, NumbersTable};
693
694 use super::*;
695 use crate::options::QueryOptions;
696 use crate::parser::QueryLanguageParser;
697 use crate::query_engine::{QueryEngineFactory, QueryEngineRef};
698
699 async fn create_test_engine() -> QueryEngineRef {
700 let catalog_manager = catalog::memory::new_memory_catalog_manager().unwrap();
701 let req = RegisterTableRequest {
702 catalog: DEFAULT_CATALOG_NAME.to_string(),
703 schema: DEFAULT_SCHEMA_NAME.to_string(),
704 table_name: NUMBERS_TABLE_NAME.to_string(),
705 table_id: NUMBERS_TABLE_ID,
706 table: NumbersTable::table(NUMBERS_TABLE_ID),
707 };
708 catalog_manager.register_table_sync(req).unwrap();
709
710 QueryEngineFactory::new(
711 catalog_manager,
712 None,
713 None,
714 None,
715 None,
716 false,
717 QueryOptions::default(),
718 )
719 .query_engine()
720 }
721
722 #[tokio::test]
723 async fn test_sql_to_plan() {
724 let engine = create_test_engine().await;
725 let sql = "select sum(number) from numbers limit 20";
726
727 let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
728 let plan = engine
729 .planner()
730 .plan(&stmt, QueryContext::arc())
731 .await
732 .unwrap();
733
734 assert_eq!(
735 plan.to_string(),
736 r#"Limit: skip=0, fetch=20
737 Projection: sum(numbers.number)
738 Aggregate: groupBy=[[]], aggr=[[sum(numbers.number)]]
739 TableScan: numbers"#
740 );
741 }
742
743 #[tokio::test]
744 async fn test_execute() {
745 let engine = create_test_engine().await;
746 let sql = "select sum(number) from numbers limit 20";
747
748 let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
749 let plan = engine
750 .planner()
751 .plan(&stmt, QueryContext::arc())
752 .await
753 .unwrap();
754
755 let output = engine.execute(plan, QueryContext::arc()).await.unwrap();
756
757 match output.data {
758 OutputData::Stream(recordbatch) => {
759 let numbers = util::collect(recordbatch).await.unwrap();
760 assert_eq!(1, numbers.len());
761 assert_eq!(numbers[0].num_columns(), 1);
762 assert_eq!(1, numbers[0].schema.num_columns());
763 assert_eq!(
764 "sum(numbers.number)",
765 numbers[0].schema.column_schemas()[0].name
766 );
767
768 let batch = &numbers[0];
769 assert_eq!(1, batch.num_columns());
770 assert_eq!(batch.column(0).len(), 1);
771
772 let expected = Arc::new(UInt64Array::from_iter_values([4950])) as ArrayRef;
773 assert_eq!(batch.column(0), &expected);
774 }
775 _ => unreachable!(),
776 }
777 }
778
779 #[tokio::test]
780 async fn test_read_table() {
781 let engine = create_test_engine().await;
782
783 let engine = engine
784 .as_any()
785 .downcast_ref::<DatafusionQueryEngine>()
786 .unwrap();
787 let query_ctx = Arc::new(QueryContextBuilder::default().build());
788 let table = engine
789 .find_table(
790 &ResolvedTableReference {
791 catalog: "greptime".into(),
792 schema: "public".into(),
793 table: "numbers".into(),
794 },
795 &query_ctx,
796 )
797 .await
798 .unwrap();
799
800 let df = engine.read_table(table).unwrap();
801 let df = df
802 .select_columns(&["number"])
803 .unwrap()
804 .filter(col("number").lt(lit(10)))
805 .unwrap();
806 let batches = df.collect().await.unwrap();
807 assert_eq!(1, batches.len());
808 let batch = &batches[0];
809
810 assert_eq!(1, batch.num_columns());
811 assert_eq!(batch.column(0).len(), 10);
812
813 assert_eq!(
814 Helper::try_into_vector(batch.column(0)).unwrap(),
815 Arc::new(UInt32Vector::from_slice([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])) as VectorRef
816 );
817 }
818
819 #[tokio::test]
820 async fn test_describe() {
821 let engine = create_test_engine().await;
822 let sql = "select sum(number) from numbers limit 20";
823
824 let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
825
826 let plan = engine
827 .planner()
828 .plan(&stmt, QueryContext::arc())
829 .await
830 .unwrap();
831
832 let DescribeResult {
833 schema,
834 logical_plan,
835 } = engine.describe(plan, QueryContext::arc()).await.unwrap();
836
837 assert_eq!(
838 schema.column_schemas()[0],
839 ColumnSchema::new(
840 "sum(numbers.number)",
841 ConcreteDataType::uint64_datatype(),
842 true
843 )
844 );
845 assert_eq!(
846 "Limit: skip=0, fetch=20\n Aggregate: groupBy=[[]], aggr=[[sum(CAST(numbers.number AS UInt64))]]\n TableScan: numbers projection=[number]",
847 format!("{}", logical_plan.display_indent())
848 );
849 }
850}