flow/plan/reduce.rs
1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::expr::{AggregateExpr, SafeMfpPlan, ScalarExpr};
16
17/// Describe how to extract key-value pair from a `Row`
18#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
19pub struct KeyValPlan {
20 /// Extract key from row
21 pub key_plan: SafeMfpPlan,
22 /// Extract value from row
23 pub val_plan: SafeMfpPlan,
24}
25
26impl KeyValPlan {
27 /// Get nth expr using column ref
28 pub fn get_nth_expr(&self, n: usize) -> Option<ScalarExpr> {
29 self.key_plan.get_nth_expr(n).or_else(|| {
30 self.val_plan
31 .get_nth_expr(n - self.key_plan.projection.len())
32 })
33 }
34}
35
36/// TODO(discord9): def&impl of Hierarchical aggregates(for min/max with support to deletion) and
37/// basic aggregates(for other aggregate functions) and mixed aggregate
38#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
39pub enum ReducePlan {
40 /// Plan for not computing any aggregations, just determining the set of
41 /// distinct keys.
42 Distinct,
43 /// Plan for computing only accumulable aggregations.
44 /// Including simple functions like `sum`, `count`, `min/max`(without deletion)
45 Accumulable(AccumulablePlan),
46}
47
48/// Accumulable plan for the execution of a reduction.
49#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
50pub struct AccumulablePlan {
51 /// All of the aggregations we were asked to compute, stored
52 /// in order.
53 pub full_aggrs: Vec<AggregateExpr>,
54 /// All of the non-distinct accumulable aggregates.
55 /// Each element represents:
56 /// (index of aggr output, index of value among inputs, aggr expr)
57 /// These will all be rendered together in one dataflow fragment.
58 ///
59 /// Invariant: the output index is the index of the aggregation in `full_aggrs`
60 /// which means output index is always smaller than the length of `full_aggrs`
61 pub simple_aggrs: Vec<AggrWithIndex>,
62 /// Same as `simple_aggrs` but for all of the `DISTINCT` accumulable aggregations.
63 pub distinct_aggrs: Vec<AggrWithIndex>,
64}
65
66/// Invariant: the output index is the index of the aggregation in `full_aggrs`
67/// which means output index is always smaller than the length of `full_aggrs`
68#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
69pub struct AggrWithIndex {
70 /// aggregation expression
71 pub expr: AggregateExpr,
72 /// index of aggr input among input row
73 pub input_idx: usize,
74 /// index of aggr output among output row
75 pub output_idx: usize,
76}
77
78impl AggrWithIndex {
79 /// Create a new `AggrWithIndex`
80 pub fn new(expr: AggregateExpr, input_idx: usize, output_idx: usize) -> Self {
81 Self {
82 expr,
83 input_idx,
84 output_idx,
85 }
86 }
87}