flow/transform/
aggr.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use snafu::OptionExt;
17use substrait_proto::proto;
18use substrait_proto::proto::aggregate_function::AggregationInvocation;
19use substrait_proto::proto::aggregate_rel::{Grouping, Measure};
20use substrait_proto::proto::function_argument::ArgType;
21
22use crate::error::{Error, NotImplementedSnafu, PlanSnafu};
23use crate::expr::{
24    AggregateExpr, AggregateFunc, MapFilterProject, ScalarExpr, TypedExpr, UnaryFunc,
25};
26use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan};
27use crate::repr::{ColumnType, RelationDesc, RelationType};
28use crate::transform::{FlownodeContext, FunctionExtensions, substrait_proto};
29
30impl TypedExpr {
31    /// Allow `deprecated` due to the usage of deprecated grouping_expressions on datafusion to substrait side
32    #[allow(deprecated)]
33    async fn from_substrait_agg_grouping(
34        ctx: &mut FlownodeContext,
35        grouping_expressions: &[proto::Expression],
36        groupings: &[Grouping],
37        typ: &RelationDesc,
38        extensions: &FunctionExtensions,
39    ) -> Result<Vec<TypedExpr>, Error> {
40        let _ = ctx;
41        let mut group_expr = vec![];
42        match groupings.len() {
43            1 => {
44                // handle case when deprecated grouping_expressions is referenced by index is empty
45                let expressions: Box<dyn Iterator<Item = &proto::Expression> + Send> = if groupings
46                    [0]
47                .expression_references
48                .is_empty()
49                {
50                    Box::new(groupings[0].grouping_expressions.iter())
51                } else {
52                    if groupings[0]
53                        .expression_references
54                        .iter()
55                        .any(|idx| *idx as usize >= grouping_expressions.len())
56                    {
57                        return PlanSnafu {
58                            reason: format!("Invalid grouping expression reference: {:?} for grouping expr: {:?}", 
59                            groupings[0].expression_references,
60                            grouping_expressions
61                        ),
62                        }.fail()?;
63                    }
64                    Box::new(
65                        groupings[0]
66                            .expression_references
67                            .iter()
68                            .map(|idx| &grouping_expressions[*idx as usize]),
69                    )
70                };
71                for e in expressions {
72                    let x = TypedExpr::from_substrait_rex(e, typ, extensions).await?;
73                    group_expr.push(x);
74                }
75            }
76            _ => {
77                return not_impl_err!(
78                    "Grouping sets not support yet, use union all with group by instead."
79                );
80            }
81        };
82        Ok(group_expr)
83    }
84}
85
86impl AggregateExpr {
87    /// Convert list of `Measure` into Flow's AggregateExpr
88    ///
89    /// Return both the AggregateExpr and a MapFilterProject that is the final output of the aggregate function
90    async fn from_substrait_agg_measures(
91        ctx: &mut FlownodeContext,
92        measures: &[Measure],
93        typ: &RelationDesc,
94        extensions: &FunctionExtensions,
95    ) -> Result<Vec<AggregateExpr>, Error> {
96        let _ = ctx;
97        let mut all_aggr_exprs = vec![];
98
99        for m in measures {
100            let filter = match m
101                .filter
102                .as_ref()
103                .map(|fil| TypedExpr::from_substrait_rex(fil, typ, extensions))
104            {
105                Some(fut) => Some(fut.await),
106                None => None,
107            }
108            .transpose()?;
109
110            let aggr_expr = match &m.measure {
111                Some(f) => {
112                    let distinct = match f.invocation {
113                        _ if f.invocation == AggregationInvocation::Distinct as i32 => true,
114                        _ if f.invocation == AggregationInvocation::All as i32 => false,
115                        _ => false,
116                    };
117                    AggregateExpr::from_substrait_agg_func(
118                        f, typ, extensions, &filter, // TODO(discord9): impl order_by
119                        &None, distinct,
120                    )
121                    .await?
122                }
123                None => {
124                    return not_impl_err!("Aggregate without aggregate function is not supported");
125                }
126            };
127
128            all_aggr_exprs.extend(aggr_expr);
129        }
130
131        Ok(all_aggr_exprs)
132    }
133
134    /// Convert AggregateFunction into Flow's AggregateExpr
135    ///
136    /// the returned value is a tuple of AggregateExpr and a optional ScalarExpr that if exist is the final output of the aggregate function
137    /// since aggr functions like `avg` need to be transform to `sum(x)/cast(count(x) as x_type)`
138    pub async fn from_substrait_agg_func(
139        f: &proto::AggregateFunction,
140        input_schema: &RelationDesc,
141        extensions: &FunctionExtensions,
142        filter: &Option<TypedExpr>,
143        order_by: &Option<Vec<TypedExpr>>,
144        distinct: bool,
145    ) -> Result<Vec<AggregateExpr>, Error> {
146        // TODO(discord9): impl filter
147        let _ = filter;
148        let _ = order_by;
149        let mut args = vec![];
150        for arg in &f.arguments {
151            let arg_expr = match &arg.arg_type {
152                Some(ArgType::Value(e)) => {
153                    TypedExpr::from_substrait_rex(e, input_schema, extensions).await
154                }
155                _ => not_impl_err!("Aggregated function argument non-Value type not supported"),
156            }?;
157            args.push(arg_expr);
158        }
159
160        if args.len() != 1 {
161            let fn_name = extensions.get(&f.function_reference).cloned();
162            return not_impl_err!(
163                "Aggregated function (name={:?}) with multiple arguments is not supported",
164                fn_name
165            );
166        }
167
168        let arg = if let Some(first) = args.first() {
169            first
170        } else {
171            return not_impl_err!("Aggregated function without arguments is not supported");
172        };
173
174        let fn_name = extensions
175            .get(&f.function_reference)
176            .cloned()
177            .map(|s| s.to_lowercase());
178
179        match fn_name.as_ref().map(|s| s.as_ref()) {
180            Some(function_name) => {
181                let func = AggregateFunc::from_str_and_type(
182                    function_name,
183                    Some(arg.typ.scalar_type.clone()),
184                )?;
185                let exprs = vec![AggregateExpr {
186                    func,
187                    expr: arg.expr.clone(),
188                    distinct,
189                }];
190                Ok(exprs)
191            }
192            None => not_impl_err!(
193                "Aggregated function not found: function anchor = {:?}",
194                f.function_reference
195            ),
196        }
197    }
198}
199
200impl KeyValPlan {
201    /// Generate KeyValPlan from AggregateExpr and group_exprs
202    ///
203    /// will also change aggregate expr to use column ref if necessary
204    fn from_substrait_gen_key_val_plan(
205        aggr_exprs: &mut [AggregateExpr],
206        group_exprs: &[TypedExpr],
207        input_arity: usize,
208    ) -> Result<KeyValPlan, Error> {
209        let group_expr_val = group_exprs
210            .iter()
211            .map(|expr| expr.expr.clone())
212            .collect_vec();
213        let output_arity = group_expr_val.len();
214        let key_plan = MapFilterProject::new(input_arity)
215            .map(group_expr_val)?
216            .project(input_arity..input_arity + output_arity)?;
217
218        // val_plan is extracted from aggr_exprs to give aggr function it's necessary input
219        // and since aggr func need inputs that is column ref, we just add a prefix mfp to transform any expr that is not into a column ref
220        let val_plan = {
221            let need_mfp = aggr_exprs.iter().any(|agg| agg.expr.as_column().is_none());
222            if need_mfp {
223                // create mfp from aggr_expr, and modify aggr_expr to use the output column of mfp
224                let input_exprs = aggr_exprs
225                    .iter_mut()
226                    .enumerate()
227                    .map(|(idx, aggr)| {
228                        let ret = aggr.expr.clone();
229                        aggr.expr = ScalarExpr::Column(idx);
230                        ret
231                    })
232                    .collect_vec();
233                let aggr_arity = aggr_exprs.len();
234
235                MapFilterProject::new(input_arity)
236                    .map(input_exprs)?
237                    .project(input_arity..input_arity + aggr_arity)?
238            } else {
239                // simply take all inputs as value
240                MapFilterProject::new(input_arity)
241            }
242        };
243        Ok(KeyValPlan {
244            key_plan: key_plan.into_safe(),
245            val_plan: val_plan.into_safe(),
246        })
247    }
248}
249
250/// find out the column that should be time index in group exprs(which is all columns that should be keys)
251/// TODO(discord9): better ways to assign time index
252/// for now, it will found the first column that is timestamp or has a tumble window floor function
253fn find_time_index_in_group_exprs(group_exprs: &[TypedExpr]) -> Option<usize> {
254    group_exprs.iter().position(|expr| {
255        matches!(
256            &expr.expr,
257            ScalarExpr::CallUnary {
258                func: UnaryFunc::TumbleWindowFloor { .. },
259                expr: _
260            }
261        ) || expr.typ.scalar_type.is_timestamp()
262    })
263}
264
265impl TypedPlan {
266    /// Convert AggregateRel into Flow's TypedPlan
267    ///
268    /// The output of aggr plan is:
269    ///
270    /// <group_exprs>..<aggr_exprs>
271    #[async_recursion::async_recursion]
272    pub async fn from_substrait_agg_rel(
273        ctx: &mut FlownodeContext,
274        agg: &proto::AggregateRel,
275        extensions: &FunctionExtensions,
276    ) -> Result<TypedPlan, Error> {
277        let input = if let Some(input) = agg.input.as_ref() {
278            TypedPlan::from_substrait_rel(ctx, input, extensions).await?
279        } else {
280            return not_impl_err!("Aggregate without an input is not supported");
281        };
282
283        let group_exprs = TypedExpr::from_substrait_agg_grouping(
284            ctx,
285            &agg.grouping_expressions,
286            &agg.groupings,
287            &input.schema,
288            extensions,
289        )
290        .await?;
291
292        let time_index = find_time_index_in_group_exprs(&group_exprs);
293
294        let mut aggr_exprs = AggregateExpr::from_substrait_agg_measures(
295            ctx,
296            &agg.measures,
297            &input.schema,
298            extensions,
299        )
300        .await?;
301
302        let key_val_plan = KeyValPlan::from_substrait_gen_key_val_plan(
303            &mut aggr_exprs,
304            &group_exprs,
305            input.schema.typ.column_types.len(),
306        )?;
307
308        // output type is group_exprs + aggr_exprs
309        let output_type = {
310            let mut output_types = Vec::new();
311            // give best effort to get column name
312            let mut output_names = Vec::new();
313
314            // first append group_expr as key, then aggr_expr as value
315            for expr in group_exprs.iter() {
316                output_types.push(expr.typ.clone());
317                let col_name = match &expr.expr {
318                    ScalarExpr::Column(col) => input.schema.get_name(*col).clone(),
319                    // TODO(discord9): impl& use ScalarExpr.display_name, which recursively build expr's name
320                    _ => None,
321                };
322                output_names.push(col_name)
323            }
324
325            for aggr in &aggr_exprs {
326                output_types.push(ColumnType::new_nullable(
327                    aggr.func.signature().output.clone(),
328                ));
329                // TODO(discord9): find a clever way to name them?
330                output_names.push(None);
331            }
332            // TODO(discord9): try best to get time
333            if group_exprs.is_empty() {
334                RelationType::new(output_types)
335            } else {
336                RelationType::new(output_types).with_key((0..group_exprs.len()).collect_vec())
337            }
338            .with_time_index(time_index)
339            .into_named(output_names)
340        };
341
342        // copy aggr_exprs to full_aggrs, and split them into simple_aggrs and distinct_aggrs
343        // also set them input/output column
344        let full_aggrs = aggr_exprs;
345        let mut simple_aggrs = Vec::new();
346        let mut distinct_aggrs = Vec::new();
347        for (output_column, aggr_expr) in full_aggrs.iter().enumerate() {
348            let input_column = aggr_expr.expr.as_column().with_context(|| PlanSnafu {
349                reason: "Expect aggregate argument to be transformed into a column at this point",
350            })?;
351            if aggr_expr.distinct {
352                distinct_aggrs.push(AggrWithIndex::new(
353                    aggr_expr.clone(),
354                    input_column,
355                    output_column,
356                ));
357            } else {
358                simple_aggrs.push(AggrWithIndex::new(
359                    aggr_expr.clone(),
360                    input_column,
361                    output_column,
362                ));
363            }
364        }
365        let accum_plan = AccumulablePlan {
366            full_aggrs,
367            simple_aggrs,
368            distinct_aggrs,
369        };
370        let plan = Plan::Reduce {
371            input: Box::new(input),
372            key_val_plan,
373            reduce_plan: ReducePlan::Accumulable(accum_plan),
374        };
375        // FIX(discord9): deal with key first
376        return Ok(TypedPlan {
377            schema: output_type,
378            plan,
379        });
380    }
381}
382
383#[cfg(test)]
384mod test {
385
386    use bytes::BytesMut;
387    use common_time::IntervalMonthDayNano;
388    use datatypes::data_type::ConcreteDataType as CDT;
389    use datatypes::prelude::ConcreteDataType;
390    use datatypes::value::Value;
391    use pretty_assertions::assert_eq;
392
393    use super::*;
394    use crate::expr::{BinaryFunc, DfScalarFunction, GlobalId, RawDfScalarFn};
395    use crate::plan::{Plan, TypedPlan};
396    use crate::repr::{ColumnType, RelationType};
397    use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait};
398
399    #[tokio::test]
400    async fn test_sum() {
401        let engine = create_test_query_engine();
402        let sql = "SELECT sum(number) FROM numbers";
403        let plan = sql_to_substrait(engine.clone(), sql).await;
404
405        let mut ctx = create_test_ctx();
406        let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).await;
407
408        let aggr_expr = AggregateExpr {
409            func: AggregateFunc::SumUInt64,
410            expr: ScalarExpr::Column(0),
411            distinct: false,
412        };
413        let expected = TypedPlan {
414            schema: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)])
415                .into_named(vec![Some("sum(numbers.number)".to_string())]),
416            plan: Plan::Reduce {
417                input: Box::new(
418                    Plan::Get {
419                        id: crate::expr::Id::Global(GlobalId::User(0)),
420                    }
421                    .with_types(
422                        RelationType::new(vec![ColumnType::new(
423                            ConcreteDataType::uint32_datatype(),
424                            false,
425                        )])
426                        .into_named(vec![Some("number".to_string())]),
427                    )
428                    .mfp(MapFilterProject::new(1).into_safe())
429                    .unwrap(),
430                ),
431                key_val_plan: KeyValPlan {
432                    key_plan: MapFilterProject::new(1)
433                        .project(vec![])
434                        .unwrap()
435                        .into_safe(),
436                    val_plan: MapFilterProject::new(1)
437                        .map(vec![
438                            ScalarExpr::Column(0)
439                                .call_unary(UnaryFunc::Cast(CDT::uint64_datatype())),
440                        ])
441                        .unwrap()
442                        .project(vec![1])
443                        .unwrap()
444                        .into_safe(),
445                },
446                reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
447                    full_aggrs: vec![aggr_expr.clone()],
448                    simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
449                    distinct_aggrs: vec![],
450                }),
451            },
452        };
453        assert_eq!(flow_plan.unwrap(), expected);
454    }
455
456    #[tokio::test]
457    async fn test_distinct_number() {
458        let engine = create_test_query_engine();
459        let sql = "SELECT DISTINCT number FROM numbers";
460        let plan = sql_to_substrait(engine.clone(), sql).await;
461
462        let mut ctx = create_test_ctx();
463        let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan)
464            .await
465            .unwrap();
466
467        let expected = TypedPlan {
468            schema: RelationType::new(vec![
469                ColumnType::new(CDT::uint32_datatype(), false), // col number
470            ])
471            .with_key(vec![0])
472            .into_named(vec![Some("number".to_string())]),
473            plan: Plan::Reduce {
474                input: Box::new(
475                    Plan::Get {
476                        id: crate::expr::Id::Global(GlobalId::User(0)),
477                    }
478                    .with_types(
479                        RelationType::new(vec![ColumnType::new(
480                            ConcreteDataType::uint32_datatype(),
481                            false,
482                        )])
483                        .into_named(vec![Some("number".to_string())]),
484                    )
485                    .mfp(MapFilterProject::new(1).into_safe())
486                    .unwrap(),
487                ),
488                key_val_plan: KeyValPlan {
489                    key_plan: MapFilterProject::new(1)
490                        .map(vec![ScalarExpr::Column(0)])
491                        .unwrap()
492                        .project(vec![1])
493                        .unwrap()
494                        .into_safe(),
495                    val_plan: MapFilterProject::new(1)
496                        .project(vec![0])
497                        .unwrap()
498                        .into_safe(),
499                },
500                reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
501                    full_aggrs: vec![],
502                    simple_aggrs: vec![],
503                    distinct_aggrs: vec![],
504                }),
505            },
506        };
507
508        assert_eq!(flow_plan, expected);
509    }
510
511    #[tokio::test]
512    async fn test_sum_group_by() {
513        let engine = create_test_query_engine();
514        let sql = "SELECT sum(number), number FROM numbers GROUP BY number";
515        let plan = sql_to_substrait(engine.clone(), sql).await;
516
517        let mut ctx = create_test_ctx();
518        let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan)
519            .await
520            .unwrap();
521
522        let aggr_expr = AggregateExpr {
523            func: AggregateFunc::SumUInt64,
524            expr: ScalarExpr::Column(0),
525            distinct: false,
526        };
527        let expected = TypedPlan {
528            schema: RelationType::new(vec![
529                ColumnType::new(CDT::uint64_datatype(), true), // col sum(number)
530                ColumnType::new(CDT::uint32_datatype(), false), // col number
531            ])
532            .with_key(vec![1])
533            .into_named(vec![
534                Some("sum(numbers.number)".to_string()),
535                Some("number".to_string()),
536            ]),
537            plan: Plan::Mfp {
538                input: Box::new(
539                    Plan::Reduce {
540                        input: Box::new(
541                            Plan::Get {
542                                id: crate::expr::Id::Global(GlobalId::User(0)),
543                            }
544                            .with_types(
545                                RelationType::new(vec![ColumnType::new(
546                                    ConcreteDataType::uint32_datatype(),
547                                    false,
548                                )])
549                                .into_named(vec![Some("number".to_string())]),
550                            )
551                            .mfp(MapFilterProject::new(1).into_safe())
552                            .unwrap(),
553                        ),
554                        key_val_plan: KeyValPlan {
555                            key_plan: MapFilterProject::new(1)
556                                .map(vec![ScalarExpr::Column(0)])
557                                .unwrap()
558                                .project(vec![1])
559                                .unwrap()
560                                .into_safe(),
561                            val_plan: MapFilterProject::new(1)
562                                .map(vec![
563                                    ScalarExpr::Column(0)
564                                        .call_unary(UnaryFunc::Cast(CDT::uint64_datatype())),
565                                ])
566                                .unwrap()
567                                .project(vec![1])
568                                .unwrap()
569                                .into_safe(),
570                        },
571                        reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
572                            full_aggrs: vec![aggr_expr.clone()],
573                            simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
574                            distinct_aggrs: vec![],
575                        }),
576                    }
577                    .with_types(
578                        RelationType::new(vec![
579                            ColumnType::new(CDT::uint32_datatype(), false), // col number
580                            ColumnType::new(CDT::uint64_datatype(), true),  // col sum(number)
581                        ])
582                        .with_key(vec![0])
583                        .into_named(vec![Some("number".to_string()), None]),
584                    ),
585                ),
586                mfp: MapFilterProject::new(2)
587                    .map(vec![ScalarExpr::Column(1), ScalarExpr::Column(0)])
588                    .unwrap()
589                    .project(vec![2, 3])
590                    .unwrap(),
591            },
592        };
593
594        assert_eq!(flow_plan, expected);
595    }
596
597    #[tokio::test]
598    async fn test_sum_add() {
599        let engine = create_test_query_engine();
600        let sql = "SELECT sum(number+number) FROM numbers";
601        let plan = sql_to_substrait(engine.clone(), sql).await;
602
603        let mut ctx = create_test_ctx();
604        let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).await;
605
606        let aggr_expr = AggregateExpr {
607            func: AggregateFunc::SumUInt64,
608            expr: ScalarExpr::Column(0),
609            distinct: false,
610        };
611        let expected = TypedPlan {
612            schema: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)])
613                .into_named(vec![Some(
614                    "sum(numbers.number + numbers.number)".to_string(),
615                )]),
616            plan: Plan::Reduce {
617                input: Box::new(
618                    Plan::Mfp {
619                        input: Box::new(
620                            Plan::Get {
621                                id: crate::expr::Id::Global(GlobalId::User(0)),
622                            }
623                            .with_types(
624                                RelationType::new(vec![ColumnType::new(
625                                    ConcreteDataType::uint32_datatype(),
626                                    false,
627                                )])
628                                .into_named(vec![Some("number".to_string())]),
629                            ),
630                        ),
631                        mfp: MapFilterProject::new(1),
632                    }
633                    .with_types(
634                        RelationType::new(vec![ColumnType::new(
635                            ConcreteDataType::uint32_datatype(),
636                            false,
637                        )])
638                        .into_named(vec![Some("number".to_string())]),
639                    ),
640                ),
641                key_val_plan: KeyValPlan {
642                    key_plan: MapFilterProject::new(1)
643                        .project(vec![])
644                        .unwrap()
645                        .into_safe(),
646                    val_plan: MapFilterProject::new(1)
647                        .map(vec![
648                            ScalarExpr::Column(0)
649                                .call_binary(ScalarExpr::Column(0), BinaryFunc::AddUInt32)
650                                .call_unary(UnaryFunc::Cast(CDT::uint64_datatype())),
651                        ])
652                        .unwrap()
653                        .project(vec![1])
654                        .unwrap()
655                        .into_safe(),
656                },
657                reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
658                    full_aggrs: vec![aggr_expr.clone()],
659                    simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
660                    distinct_aggrs: vec![],
661                }),
662            },
663        };
664        assert_eq!(flow_plan.unwrap(), expected);
665    }
666
667    #[tokio::test]
668    async fn test_cast_max_min() {
669        let engine = create_test_query_engine();
670        let sql = "SELECT (max(number) - min(number))/30.0, date_bin(INTERVAL '30 second', CAST(ts AS TimestampMillisecond)) as time_window from numbers_with_ts GROUP BY time_window";
671        let plan = sql_to_substrait(engine.clone(), sql).await;
672
673        let mut ctx = create_test_ctx();
674        let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).await;
675
676        let aggr_exprs = vec![
677            AggregateExpr {
678                func: AggregateFunc::MaxUInt32,
679                expr: ScalarExpr::Column(0),
680                distinct: false,
681            },
682            AggregateExpr {
683                func: AggregateFunc::MinUInt32,
684                expr: ScalarExpr::Column(0),
685                distinct: false,
686            },
687        ];
688        let expected = TypedPlan {
689            schema: RelationType::new(vec![
690                ColumnType::new(CDT::float64_datatype(), true),
691                ColumnType::new(CDT::timestamp_millisecond_datatype(), true),
692            ])
693            .with_time_index(Some(1))
694            .into_named(vec![
695                Some(
696                    "max(numbers_with_ts.number) - min(numbers_with_ts.number) / Float64(30)"
697                        .to_string(),
698                ),
699                Some("time_window".to_string()),
700            ]),
701            plan: Plan::Mfp {
702                input: Box::new(
703                    Plan::Reduce {
704                        input: Box::new(
705                            Plan::Get {
706                                id: crate::expr::Id::Global(GlobalId::User(1)),
707                            }
708                            .with_types(
709                                RelationType::new(vec![
710                                    ColumnType::new(ConcreteDataType::uint32_datatype(), false),
711                                    ColumnType::new(ConcreteDataType::timestamp_millisecond_datatype(), false),
712                                ])
713                                .into_named(vec![
714                                    Some("number".to_string()),
715                                    Some("ts".to_string()),
716                                ]),
717                            )
718                            .mfp(MapFilterProject::new(2).into_safe())
719                            .unwrap(),
720                        ),
721
722                        key_val_plan: KeyValPlan {
723                            key_plan: MapFilterProject::new(2)
724                                .map(vec![ScalarExpr::CallDf {
725                                    df_scalar_fn: DfScalarFunction::try_from_raw_fn(
726                                        RawDfScalarFn {
727                                            f: BytesMut::from(
728                                                b"\x08\x02\"\x0f\x1a\r\n\x0b\xa2\x02\x08\n\0\x12\x04\x10\x1e \t\"\n\x1a\x08\x12\x06\n\x04\x12\x02\x08\x01".as_ref(),
729                                            ),
730                                            input_schema: RelationType::new(vec![ColumnType::new(
731                                                ConcreteDataType::interval_month_day_nano_datatype(),
732                                                true,
733                                            ),ColumnType::new(
734                                                ConcreteDataType::timestamp_millisecond_datatype(),
735                                                false,
736                                            )])
737                                            .into_unnamed(),
738                                            extensions: FunctionExtensions::from_iter([
739                                                    (0, "subtract".to_string()),
740                                                    (1, "divide".to_string()),
741                                                    (2, "date_bin".to_string()),
742                                                    (3, "max".to_string()),
743                                                    (4, "min".to_string()),
744                                                ]),
745                                        },
746                                    )
747                                    .await
748                                    .unwrap(),
749                                    exprs: vec![
750                                        ScalarExpr::Literal(
751                                            Value::IntervalMonthDayNano(IntervalMonthDayNano::new(0, 0, 30000000000)),
752                                            CDT::interval_month_day_nano_datatype()
753                                        ),
754                                        ScalarExpr::Column(1)
755                                        ],
756                                }])
757                                .unwrap()
758                                .project(vec![2])
759                                .unwrap()
760                                .into_safe(),
761                            val_plan: MapFilterProject::new(2)
762                                .into_safe(),
763                        },
764                        reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
765                            full_aggrs: aggr_exprs.clone(),
766                            simple_aggrs: vec![AggrWithIndex::new(aggr_exprs[0].clone(), 0, 0),
767                            AggrWithIndex::new(aggr_exprs[1].clone(), 0, 1)],
768                            distinct_aggrs: vec![],
769                        }),
770                    }
771                    .with_types(
772                        RelationType::new(vec![
773                            ColumnType::new(
774                                ConcreteDataType::timestamp_millisecond_datatype(),
775                                true,
776                            ), // time_window
777                            ColumnType::new(ConcreteDataType::uint32_datatype(), true), // max
778                            ColumnType::new(ConcreteDataType::uint32_datatype(), true), // min
779                        ])
780                        .with_time_index(Some(0))
781                        .into_unnamed(),
782                    ),
783                ),
784                mfp: MapFilterProject::new(3)
785                    .map(vec![
786                        ScalarExpr::Column(1)
787                            .call_binary(ScalarExpr::Column(2), BinaryFunc::SubUInt32)
788                            .cast(CDT::float64_datatype())
789                            .call_binary(
790                                ScalarExpr::Literal(Value::from(30.0f64), CDT::float64_datatype()),
791                                BinaryFunc::DivFloat64,
792                            ),
793                        ScalarExpr::Column(0),
794                    ])
795                    .unwrap()
796                    .project(vec![3, 4])
797                    .unwrap(),
798            },
799        };
800
801        assert_eq!(flow_plan.unwrap(), expected);
802    }
803}