1use itertools::Itertools;
16use snafu::OptionExt;
17use substrait_proto::proto;
18use substrait_proto::proto::aggregate_function::AggregationInvocation;
19use substrait_proto::proto::aggregate_rel::{Grouping, Measure};
20use substrait_proto::proto::function_argument::ArgType;
21
22use crate::error::{Error, NotImplementedSnafu, PlanSnafu};
23use crate::expr::{
24 AggregateExpr, AggregateFunc, MapFilterProject, ScalarExpr, TypedExpr, UnaryFunc,
25};
26use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan};
27use crate::repr::{ColumnType, RelationDesc, RelationType};
28use crate::transform::{FlownodeContext, FunctionExtensions, substrait_proto};
29
30impl TypedExpr {
31 #[allow(deprecated)]
33 async fn from_substrait_agg_grouping(
34 ctx: &mut FlownodeContext,
35 grouping_expressions: &[proto::Expression],
36 groupings: &[Grouping],
37 typ: &RelationDesc,
38 extensions: &FunctionExtensions,
39 ) -> Result<Vec<TypedExpr>, Error> {
40 let _ = ctx;
41 let mut group_expr = vec![];
42 match groupings.len() {
43 1 => {
44 let expressions: Box<dyn Iterator<Item = &proto::Expression> + Send> = if groupings
46 [0]
47 .expression_references
48 .is_empty()
49 {
50 Box::new(groupings[0].grouping_expressions.iter())
51 } else {
52 if groupings[0]
53 .expression_references
54 .iter()
55 .any(|idx| *idx as usize >= grouping_expressions.len())
56 {
57 return PlanSnafu {
58 reason: format!("Invalid grouping expression reference: {:?} for grouping expr: {:?}",
59 groupings[0].expression_references,
60 grouping_expressions
61 ),
62 }.fail()?;
63 }
64 Box::new(
65 groupings[0]
66 .expression_references
67 .iter()
68 .map(|idx| &grouping_expressions[*idx as usize]),
69 )
70 };
71 for e in expressions {
72 let x = TypedExpr::from_substrait_rex(e, typ, extensions).await?;
73 group_expr.push(x);
74 }
75 }
76 _ => {
77 return not_impl_err!(
78 "Grouping sets not support yet, use union all with group by instead."
79 );
80 }
81 };
82 Ok(group_expr)
83 }
84}
85
86impl AggregateExpr {
87 async fn from_substrait_agg_measures(
91 ctx: &mut FlownodeContext,
92 measures: &[Measure],
93 typ: &RelationDesc,
94 extensions: &FunctionExtensions,
95 ) -> Result<Vec<AggregateExpr>, Error> {
96 let _ = ctx;
97 let mut all_aggr_exprs = vec![];
98
99 for m in measures {
100 let filter = match m
101 .filter
102 .as_ref()
103 .map(|fil| TypedExpr::from_substrait_rex(fil, typ, extensions))
104 {
105 Some(fut) => Some(fut.await),
106 None => None,
107 }
108 .transpose()?;
109
110 let aggr_expr = match &m.measure {
111 Some(f) => {
112 let distinct = match f.invocation {
113 _ if f.invocation == AggregationInvocation::Distinct as i32 => true,
114 _ if f.invocation == AggregationInvocation::All as i32 => false,
115 _ => false,
116 };
117 AggregateExpr::from_substrait_agg_func(
118 f, typ, extensions, &filter, &None, distinct,
120 )
121 .await?
122 }
123 None => {
124 return not_impl_err!("Aggregate without aggregate function is not supported");
125 }
126 };
127
128 all_aggr_exprs.extend(aggr_expr);
129 }
130
131 Ok(all_aggr_exprs)
132 }
133
134 pub async fn from_substrait_agg_func(
139 f: &proto::AggregateFunction,
140 input_schema: &RelationDesc,
141 extensions: &FunctionExtensions,
142 filter: &Option<TypedExpr>,
143 order_by: &Option<Vec<TypedExpr>>,
144 distinct: bool,
145 ) -> Result<Vec<AggregateExpr>, Error> {
146 let _ = filter;
148 let _ = order_by;
149 let mut args = vec![];
150 for arg in &f.arguments {
151 let arg_expr = match &arg.arg_type {
152 Some(ArgType::Value(e)) => {
153 TypedExpr::from_substrait_rex(e, input_schema, extensions).await
154 }
155 _ => not_impl_err!("Aggregated function argument non-Value type not supported"),
156 }?;
157 args.push(arg_expr);
158 }
159
160 if args.len() != 1 {
161 let fn_name = extensions.get(&f.function_reference).cloned();
162 return not_impl_err!(
163 "Aggregated function (name={:?}) with multiple arguments is not supported",
164 fn_name
165 );
166 }
167
168 let arg = if let Some(first) = args.first() {
169 first
170 } else {
171 return not_impl_err!("Aggregated function without arguments is not supported");
172 };
173
174 let fn_name = extensions
175 .get(&f.function_reference)
176 .cloned()
177 .map(|s| s.to_lowercase());
178
179 match fn_name.as_ref().map(|s| s.as_ref()) {
180 Some(function_name) => {
181 let func = AggregateFunc::from_str_and_type(
182 function_name,
183 Some(arg.typ.scalar_type.clone()),
184 )?;
185 let exprs = vec![AggregateExpr {
186 func,
187 expr: arg.expr.clone(),
188 distinct,
189 }];
190 Ok(exprs)
191 }
192 None => not_impl_err!(
193 "Aggregated function not found: function anchor = {:?}",
194 f.function_reference
195 ),
196 }
197 }
198}
199
200impl KeyValPlan {
201 fn from_substrait_gen_key_val_plan(
205 aggr_exprs: &mut [AggregateExpr],
206 group_exprs: &[TypedExpr],
207 input_arity: usize,
208 ) -> Result<KeyValPlan, Error> {
209 let group_expr_val = group_exprs
210 .iter()
211 .map(|expr| expr.expr.clone())
212 .collect_vec();
213 let output_arity = group_expr_val.len();
214 let key_plan = MapFilterProject::new(input_arity)
215 .map(group_expr_val)?
216 .project(input_arity..input_arity + output_arity)?;
217
218 let val_plan = {
221 let need_mfp = aggr_exprs.iter().any(|agg| agg.expr.as_column().is_none());
222 if need_mfp {
223 let input_exprs = aggr_exprs
225 .iter_mut()
226 .enumerate()
227 .map(|(idx, aggr)| {
228 let ret = aggr.expr.clone();
229 aggr.expr = ScalarExpr::Column(idx);
230 ret
231 })
232 .collect_vec();
233 let aggr_arity = aggr_exprs.len();
234
235 MapFilterProject::new(input_arity)
236 .map(input_exprs)?
237 .project(input_arity..input_arity + aggr_arity)?
238 } else {
239 MapFilterProject::new(input_arity)
241 }
242 };
243 Ok(KeyValPlan {
244 key_plan: key_plan.into_safe(),
245 val_plan: val_plan.into_safe(),
246 })
247 }
248}
249
250fn find_time_index_in_group_exprs(group_exprs: &[TypedExpr]) -> Option<usize> {
254 group_exprs.iter().position(|expr| {
255 matches!(
256 &expr.expr,
257 ScalarExpr::CallUnary {
258 func: UnaryFunc::TumbleWindowFloor { .. },
259 expr: _
260 }
261 ) || expr.typ.scalar_type.is_timestamp()
262 })
263}
264
265impl TypedPlan {
266 #[async_recursion::async_recursion]
272 pub async fn from_substrait_agg_rel(
273 ctx: &mut FlownodeContext,
274 agg: &proto::AggregateRel,
275 extensions: &FunctionExtensions,
276 ) -> Result<TypedPlan, Error> {
277 let input = if let Some(input) = agg.input.as_ref() {
278 TypedPlan::from_substrait_rel(ctx, input, extensions).await?
279 } else {
280 return not_impl_err!("Aggregate without an input is not supported");
281 };
282
283 let group_exprs = TypedExpr::from_substrait_agg_grouping(
284 ctx,
285 &agg.grouping_expressions,
286 &agg.groupings,
287 &input.schema,
288 extensions,
289 )
290 .await?;
291
292 let time_index = find_time_index_in_group_exprs(&group_exprs);
293
294 let mut aggr_exprs = AggregateExpr::from_substrait_agg_measures(
295 ctx,
296 &agg.measures,
297 &input.schema,
298 extensions,
299 )
300 .await?;
301
302 let key_val_plan = KeyValPlan::from_substrait_gen_key_val_plan(
303 &mut aggr_exprs,
304 &group_exprs,
305 input.schema.typ.column_types.len(),
306 )?;
307
308 let output_type = {
310 let mut output_types = Vec::new();
311 let mut output_names = Vec::new();
313
314 for expr in group_exprs.iter() {
316 output_types.push(expr.typ.clone());
317 let col_name = match &expr.expr {
318 ScalarExpr::Column(col) => input.schema.get_name(*col).clone(),
319 _ => None,
321 };
322 output_names.push(col_name)
323 }
324
325 for aggr in &aggr_exprs {
326 output_types.push(ColumnType::new_nullable(
327 aggr.func.signature().output.clone(),
328 ));
329 output_names.push(None);
331 }
332 if group_exprs.is_empty() {
334 RelationType::new(output_types)
335 } else {
336 RelationType::new(output_types).with_key((0..group_exprs.len()).collect_vec())
337 }
338 .with_time_index(time_index)
339 .into_named(output_names)
340 };
341
342 let full_aggrs = aggr_exprs;
345 let mut simple_aggrs = Vec::new();
346 let mut distinct_aggrs = Vec::new();
347 for (output_column, aggr_expr) in full_aggrs.iter().enumerate() {
348 let input_column = aggr_expr.expr.as_column().with_context(|| PlanSnafu {
349 reason: "Expect aggregate argument to be transformed into a column at this point",
350 })?;
351 if aggr_expr.distinct {
352 distinct_aggrs.push(AggrWithIndex::new(
353 aggr_expr.clone(),
354 input_column,
355 output_column,
356 ));
357 } else {
358 simple_aggrs.push(AggrWithIndex::new(
359 aggr_expr.clone(),
360 input_column,
361 output_column,
362 ));
363 }
364 }
365 let accum_plan = AccumulablePlan {
366 full_aggrs,
367 simple_aggrs,
368 distinct_aggrs,
369 };
370 let plan = Plan::Reduce {
371 input: Box::new(input),
372 key_val_plan,
373 reduce_plan: ReducePlan::Accumulable(accum_plan),
374 };
375 return Ok(TypedPlan {
377 schema: output_type,
378 plan,
379 });
380 }
381}
382
383#[cfg(test)]
384mod test {
385
386 use bytes::BytesMut;
387 use common_time::IntervalMonthDayNano;
388 use datatypes::data_type::ConcreteDataType as CDT;
389 use datatypes::prelude::ConcreteDataType;
390 use datatypes::value::Value;
391 use pretty_assertions::assert_eq;
392
393 use super::*;
394 use crate::expr::{BinaryFunc, DfScalarFunction, GlobalId, RawDfScalarFn};
395 use crate::plan::{Plan, TypedPlan};
396 use crate::repr::{ColumnType, RelationType};
397 use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait};
398
399 #[tokio::test]
400 async fn test_sum() {
401 let engine = create_test_query_engine();
402 let sql = "SELECT sum(number) FROM numbers";
403 let plan = sql_to_substrait(engine.clone(), sql).await;
404
405 let mut ctx = create_test_ctx();
406 let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).await;
407
408 let aggr_expr = AggregateExpr {
409 func: AggregateFunc::SumUInt64,
410 expr: ScalarExpr::Column(0),
411 distinct: false,
412 };
413 let expected = TypedPlan {
414 schema: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)])
415 .into_named(vec![Some("sum(numbers.number)".to_string())]),
416 plan: Plan::Reduce {
417 input: Box::new(
418 Plan::Get {
419 id: crate::expr::Id::Global(GlobalId::User(0)),
420 }
421 .with_types(
422 RelationType::new(vec![ColumnType::new(
423 ConcreteDataType::uint32_datatype(),
424 false,
425 )])
426 .into_named(vec![Some("number".to_string())]),
427 )
428 .mfp(MapFilterProject::new(1).into_safe())
429 .unwrap(),
430 ),
431 key_val_plan: KeyValPlan {
432 key_plan: MapFilterProject::new(1)
433 .project(vec![])
434 .unwrap()
435 .into_safe(),
436 val_plan: MapFilterProject::new(1)
437 .map(vec![
438 ScalarExpr::Column(0)
439 .call_unary(UnaryFunc::Cast(CDT::uint64_datatype())),
440 ])
441 .unwrap()
442 .project(vec![1])
443 .unwrap()
444 .into_safe(),
445 },
446 reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
447 full_aggrs: vec![aggr_expr.clone()],
448 simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
449 distinct_aggrs: vec![],
450 }),
451 },
452 };
453 assert_eq!(flow_plan.unwrap(), expected);
454 }
455
456 #[tokio::test]
457 async fn test_distinct_number() {
458 let engine = create_test_query_engine();
459 let sql = "SELECT DISTINCT number FROM numbers";
460 let plan = sql_to_substrait(engine.clone(), sql).await;
461
462 let mut ctx = create_test_ctx();
463 let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan)
464 .await
465 .unwrap();
466
467 let expected = TypedPlan {
468 schema: RelationType::new(vec![
469 ColumnType::new(CDT::uint32_datatype(), false), ])
471 .with_key(vec![0])
472 .into_named(vec![Some("number".to_string())]),
473 plan: Plan::Reduce {
474 input: Box::new(
475 Plan::Get {
476 id: crate::expr::Id::Global(GlobalId::User(0)),
477 }
478 .with_types(
479 RelationType::new(vec![ColumnType::new(
480 ConcreteDataType::uint32_datatype(),
481 false,
482 )])
483 .into_named(vec![Some("number".to_string())]),
484 )
485 .mfp(MapFilterProject::new(1).into_safe())
486 .unwrap(),
487 ),
488 key_val_plan: KeyValPlan {
489 key_plan: MapFilterProject::new(1)
490 .map(vec![ScalarExpr::Column(0)])
491 .unwrap()
492 .project(vec![1])
493 .unwrap()
494 .into_safe(),
495 val_plan: MapFilterProject::new(1)
496 .project(vec![0])
497 .unwrap()
498 .into_safe(),
499 },
500 reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
501 full_aggrs: vec![],
502 simple_aggrs: vec![],
503 distinct_aggrs: vec![],
504 }),
505 },
506 };
507
508 assert_eq!(flow_plan, expected);
509 }
510
511 #[tokio::test]
512 async fn test_sum_group_by() {
513 let engine = create_test_query_engine();
514 let sql = "SELECT sum(number), number FROM numbers GROUP BY number";
515 let plan = sql_to_substrait(engine.clone(), sql).await;
516
517 let mut ctx = create_test_ctx();
518 let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan)
519 .await
520 .unwrap();
521
522 let aggr_expr = AggregateExpr {
523 func: AggregateFunc::SumUInt64,
524 expr: ScalarExpr::Column(0),
525 distinct: false,
526 };
527 let expected = TypedPlan {
528 schema: RelationType::new(vec![
529 ColumnType::new(CDT::uint64_datatype(), true), ColumnType::new(CDT::uint32_datatype(), false), ])
532 .with_key(vec![1])
533 .into_named(vec![
534 Some("sum(numbers.number)".to_string()),
535 Some("number".to_string()),
536 ]),
537 plan: Plan::Mfp {
538 input: Box::new(
539 Plan::Reduce {
540 input: Box::new(
541 Plan::Get {
542 id: crate::expr::Id::Global(GlobalId::User(0)),
543 }
544 .with_types(
545 RelationType::new(vec![ColumnType::new(
546 ConcreteDataType::uint32_datatype(),
547 false,
548 )])
549 .into_named(vec![Some("number".to_string())]),
550 )
551 .mfp(MapFilterProject::new(1).into_safe())
552 .unwrap(),
553 ),
554 key_val_plan: KeyValPlan {
555 key_plan: MapFilterProject::new(1)
556 .map(vec![ScalarExpr::Column(0)])
557 .unwrap()
558 .project(vec![1])
559 .unwrap()
560 .into_safe(),
561 val_plan: MapFilterProject::new(1)
562 .map(vec![
563 ScalarExpr::Column(0)
564 .call_unary(UnaryFunc::Cast(CDT::uint64_datatype())),
565 ])
566 .unwrap()
567 .project(vec![1])
568 .unwrap()
569 .into_safe(),
570 },
571 reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
572 full_aggrs: vec![aggr_expr.clone()],
573 simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
574 distinct_aggrs: vec![],
575 }),
576 }
577 .with_types(
578 RelationType::new(vec![
579 ColumnType::new(CDT::uint32_datatype(), false), ColumnType::new(CDT::uint64_datatype(), true), ])
582 .with_key(vec![0])
583 .into_named(vec![Some("number".to_string()), None]),
584 ),
585 ),
586 mfp: MapFilterProject::new(2)
587 .map(vec![ScalarExpr::Column(1), ScalarExpr::Column(0)])
588 .unwrap()
589 .project(vec![2, 3])
590 .unwrap(),
591 },
592 };
593
594 assert_eq!(flow_plan, expected);
595 }
596
597 #[tokio::test]
598 async fn test_sum_add() {
599 let engine = create_test_query_engine();
600 let sql = "SELECT sum(number+number) FROM numbers";
601 let plan = sql_to_substrait(engine.clone(), sql).await;
602
603 let mut ctx = create_test_ctx();
604 let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).await;
605
606 let aggr_expr = AggregateExpr {
607 func: AggregateFunc::SumUInt64,
608 expr: ScalarExpr::Column(0),
609 distinct: false,
610 };
611 let expected = TypedPlan {
612 schema: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)])
613 .into_named(vec![Some(
614 "sum(numbers.number + numbers.number)".to_string(),
615 )]),
616 plan: Plan::Reduce {
617 input: Box::new(
618 Plan::Mfp {
619 input: Box::new(
620 Plan::Get {
621 id: crate::expr::Id::Global(GlobalId::User(0)),
622 }
623 .with_types(
624 RelationType::new(vec![ColumnType::new(
625 ConcreteDataType::uint32_datatype(),
626 false,
627 )])
628 .into_named(vec![Some("number".to_string())]),
629 ),
630 ),
631 mfp: MapFilterProject::new(1),
632 }
633 .with_types(
634 RelationType::new(vec![ColumnType::new(
635 ConcreteDataType::uint32_datatype(),
636 false,
637 )])
638 .into_named(vec![Some("number".to_string())]),
639 ),
640 ),
641 key_val_plan: KeyValPlan {
642 key_plan: MapFilterProject::new(1)
643 .project(vec![])
644 .unwrap()
645 .into_safe(),
646 val_plan: MapFilterProject::new(1)
647 .map(vec![
648 ScalarExpr::Column(0)
649 .call_binary(ScalarExpr::Column(0), BinaryFunc::AddUInt32)
650 .call_unary(UnaryFunc::Cast(CDT::uint64_datatype())),
651 ])
652 .unwrap()
653 .project(vec![1])
654 .unwrap()
655 .into_safe(),
656 },
657 reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
658 full_aggrs: vec![aggr_expr.clone()],
659 simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
660 distinct_aggrs: vec![],
661 }),
662 },
663 };
664 assert_eq!(flow_plan.unwrap(), expected);
665 }
666
667 #[tokio::test]
668 async fn test_cast_max_min() {
669 let engine = create_test_query_engine();
670 let sql = "SELECT (max(number) - min(number))/30.0, date_bin(INTERVAL '30 second', CAST(ts AS TimestampMillisecond)) as time_window from numbers_with_ts GROUP BY time_window";
671 let plan = sql_to_substrait(engine.clone(), sql).await;
672
673 let mut ctx = create_test_ctx();
674 let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).await;
675
676 let aggr_exprs = vec![
677 AggregateExpr {
678 func: AggregateFunc::MaxUInt32,
679 expr: ScalarExpr::Column(0),
680 distinct: false,
681 },
682 AggregateExpr {
683 func: AggregateFunc::MinUInt32,
684 expr: ScalarExpr::Column(0),
685 distinct: false,
686 },
687 ];
688 let expected = TypedPlan {
689 schema: RelationType::new(vec![
690 ColumnType::new(CDT::float64_datatype(), true),
691 ColumnType::new(CDT::timestamp_millisecond_datatype(), true),
692 ])
693 .with_time_index(Some(1))
694 .into_named(vec![
695 Some(
696 "max(numbers_with_ts.number) - min(numbers_with_ts.number) / Float64(30)"
697 .to_string(),
698 ),
699 Some("time_window".to_string()),
700 ]),
701 plan: Plan::Mfp {
702 input: Box::new(
703 Plan::Reduce {
704 input: Box::new(
705 Plan::Get {
706 id: crate::expr::Id::Global(GlobalId::User(1)),
707 }
708 .with_types(
709 RelationType::new(vec![
710 ColumnType::new(ConcreteDataType::uint32_datatype(), false),
711 ColumnType::new(ConcreteDataType::timestamp_millisecond_datatype(), false),
712 ])
713 .into_named(vec![
714 Some("number".to_string()),
715 Some("ts".to_string()),
716 ]),
717 )
718 .mfp(MapFilterProject::new(2).into_safe())
719 .unwrap(),
720 ),
721
722 key_val_plan: KeyValPlan {
723 key_plan: MapFilterProject::new(2)
724 .map(vec![ScalarExpr::CallDf {
725 df_scalar_fn: DfScalarFunction::try_from_raw_fn(
726 RawDfScalarFn {
727 f: BytesMut::from(
728 b"\x08\x02\"\x0f\x1a\r\n\x0b\xa2\x02\x08\n\0\x12\x04\x10\x1e \t\"\n\x1a\x08\x12\x06\n\x04\x12\x02\x08\x01".as_ref(),
729 ),
730 input_schema: RelationType::new(vec![ColumnType::new(
731 ConcreteDataType::interval_month_day_nano_datatype(),
732 true,
733 ),ColumnType::new(
734 ConcreteDataType::timestamp_millisecond_datatype(),
735 false,
736 )])
737 .into_unnamed(),
738 extensions: FunctionExtensions::from_iter([
739 (0, "subtract".to_string()),
740 (1, "divide".to_string()),
741 (2, "date_bin".to_string()),
742 (3, "max".to_string()),
743 (4, "min".to_string()),
744 ]),
745 },
746 )
747 .await
748 .unwrap(),
749 exprs: vec![
750 ScalarExpr::Literal(
751 Value::IntervalMonthDayNano(IntervalMonthDayNano::new(0, 0, 30000000000)),
752 CDT::interval_month_day_nano_datatype()
753 ),
754 ScalarExpr::Column(1)
755 ],
756 }])
757 .unwrap()
758 .project(vec![2])
759 .unwrap()
760 .into_safe(),
761 val_plan: MapFilterProject::new(2)
762 .into_safe(),
763 },
764 reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
765 full_aggrs: aggr_exprs.clone(),
766 simple_aggrs: vec![AggrWithIndex::new(aggr_exprs[0].clone(), 0, 0),
767 AggrWithIndex::new(aggr_exprs[1].clone(), 0, 1)],
768 distinct_aggrs: vec![],
769 }),
770 }
771 .with_types(
772 RelationType::new(vec![
773 ColumnType::new(
774 ConcreteDataType::timestamp_millisecond_datatype(),
775 true,
776 ), ColumnType::new(ConcreteDataType::uint32_datatype(), true), ColumnType::new(ConcreteDataType::uint32_datatype(), true), ])
780 .with_time_index(Some(0))
781 .into_unnamed(),
782 ),
783 ),
784 mfp: MapFilterProject::new(3)
785 .map(vec![
786 ScalarExpr::Column(1)
787 .call_binary(ScalarExpr::Column(2), BinaryFunc::SubUInt32)
788 .cast(CDT::float64_datatype())
789 .call_binary(
790 ScalarExpr::Literal(Value::from(30.0f64), CDT::float64_datatype()),
791 BinaryFunc::DivFloat64,
792 ),
793 ScalarExpr::Column(0),
794 ])
795 .unwrap()
796 .project(vec![3, 4])
797 .unwrap(),
798 },
799 };
800
801 assert_eq!(flow_plan.unwrap(), expected);
802 }
803}