flow/plan/
reduce.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::expr::{AggregateExpr, SafeMfpPlan, ScalarExpr};
16
17/// Describe how to extract key-value pair from a `Row`
18#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
19pub struct KeyValPlan {
20    /// Extract key from row
21    pub key_plan: SafeMfpPlan,
22    /// Extract value from row
23    pub val_plan: SafeMfpPlan,
24}
25
26impl KeyValPlan {
27    /// Get nth expr using column ref
28    pub fn get_nth_expr(&self, n: usize) -> Option<ScalarExpr> {
29        self.key_plan.get_nth_expr(n).or_else(|| {
30            self.val_plan
31                .get_nth_expr(n - self.key_plan.projection.len())
32        })
33    }
34}
35
36/// TODO(discord9): def&impl of Hierarchical aggregates(for min/max with support to deletion) and
37/// basic aggregates(for other aggregate functions) and mixed aggregate
38#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
39pub enum ReducePlan {
40    /// Plan for not computing any aggregations, just determining the set of
41    /// distinct keys.
42    Distinct,
43    /// Plan for computing only accumulable aggregations.
44    /// Including simple functions like `sum`, `count`, `min/max`(without deletion)
45    Accumulable(AccumulablePlan),
46}
47
48/// Accumulable plan for the execution of a reduction.
49#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
50pub struct AccumulablePlan {
51    /// All of the aggregations we were asked to compute, stored
52    /// in order.
53    pub full_aggrs: Vec<AggregateExpr>,
54    /// All of the non-distinct accumulable aggregates.
55    /// Each element represents:
56    /// (index of aggr output, index of value among inputs, aggr expr)
57    /// These will all be rendered together in one dataflow fragment.
58    ///
59    /// Invariant: the output index is the index of the aggregation in `full_aggrs`
60    /// which means output index is always smaller than the length of `full_aggrs`
61    pub simple_aggrs: Vec<AggrWithIndex>,
62    /// Same as `simple_aggrs` but for all of the `DISTINCT` accumulable aggregations.
63    pub distinct_aggrs: Vec<AggrWithIndex>,
64}
65
66/// Invariant: the output index is the index of the aggregation in `full_aggrs`
67/// which means output index is always smaller than the length of `full_aggrs`
68#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
69pub struct AggrWithIndex {
70    /// aggregation expression
71    pub expr: AggregateExpr,
72    /// index of aggr input among input row
73    pub input_idx: usize,
74    /// index of aggr output among output row
75    pub output_idx: usize,
76}
77
78impl AggrWithIndex {
79    /// Create a new `AggrWithIndex`
80    pub fn new(expr: AggregateExpr, input_idx: usize, output_idx: usize) -> Self {
81        Self {
82            expr,
83            input_idx,
84            output_idx,
85        }
86    }
87}