flow/
plan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module contain basic definition for dataflow's plan
16//! that can be translate to hydro dataflow
17
18mod join;
19mod reduce;
20
21use std::collections::BTreeSet;
22
23use crate::error::Error;
24use crate::expr::{GlobalId, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr, TypedExpr};
25use crate::plan::join::JoinPlan;
26pub(crate) use crate::plan::reduce::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan};
27use crate::repr::{DiffRow, RelationDesc};
28
29/// A plan for a dataflow component. But with type to indicate the output type of the relation.
30#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
31pub struct TypedPlan {
32    /// output type of the relation
33    pub schema: RelationDesc,
34    /// The untyped plan.
35    pub plan: Plan,
36}
37
38impl TypedPlan {
39    /// directly apply a mfp to the plan
40    pub fn mfp(self, mfp: SafeMfpPlan) -> Result<Self, Error> {
41        let new_type = self.schema.apply_mfp(&mfp)?;
42        let mfp = mfp.mfp;
43        let plan = match self.plan {
44            Plan::Mfp {
45                input,
46                mfp: old_mfp,
47            } => Plan::Mfp {
48                input,
49                mfp: MapFilterProject::compose(old_mfp, mfp)?,
50            },
51            _ => Plan::Mfp {
52                input: Box::new(self),
53                mfp,
54            },
55        };
56        Ok(TypedPlan {
57            schema: new_type,
58            plan,
59        })
60    }
61
62    /// project the plan to the given expressions
63    pub fn projection(self, exprs: Vec<TypedExpr>) -> Result<Self, Error> {
64        let input_arity = self.schema.typ.column_types.len();
65        let output_arity = exprs.len();
66
67        let (exprs, _expr_typs): (Vec<_>, Vec<_>) = exprs
68            .into_iter()
69            .map(|TypedExpr { expr, typ }| (expr, typ))
70            .unzip();
71        let mfp = MapFilterProject::new(input_arity)
72            .map(exprs)?
73            .project(input_arity..input_arity + output_arity)?
74            .into_safe();
75        let out_typ = self.schema.apply_mfp(&mfp)?;
76
77        let mfp = mfp.mfp;
78        // special case for mfp to compose when the plan is already mfp
79        let plan = match self.plan {
80            Plan::Mfp {
81                input,
82                mfp: old_mfp,
83            } => Plan::Mfp {
84                input,
85                mfp: MapFilterProject::compose(old_mfp, mfp)?,
86            },
87            _ => Plan::Mfp {
88                input: Box::new(self),
89                mfp,
90            },
91        };
92        Ok(TypedPlan {
93            schema: out_typ,
94            plan,
95        })
96    }
97
98    /// Add a new filter to the plan, will filter out the records that do not satisfy the filter
99    pub fn filter(self, filter: TypedExpr) -> Result<Self, Error> {
100        let typ = self.schema.clone();
101        let plan = match self.plan {
102            Plan::Mfp {
103                input,
104                mfp: old_mfp,
105            } => Plan::Mfp {
106                input,
107                mfp: old_mfp.filter(vec![filter.expr])?,
108            },
109            _ => Plan::Mfp {
110                input: Box::new(self),
111                mfp: MapFilterProject::new(typ.typ.column_types.len()).filter(vec![filter.expr])?,
112            },
113        };
114        Ok(TypedPlan { schema: typ, plan })
115    }
116}
117
118/// TODO(discord9): support `TableFunc`(by define FlatMap that map 1 to n)
119/// Plan describe how to transform data in dataflow
120///
121/// This can be considered as a physical plan in dataflow, which describe how to transform data in a streaming manner.
122#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
123pub enum Plan {
124    /// A constant collection of rows.
125    Constant { rows: Vec<DiffRow> },
126    /// Get CDC data from an source, be it external reference to an existing source or an internal
127    /// reference to a `Let` identifier
128    Get { id: Id },
129    /// Create a temporary collection from given `value`, and make this bind only available
130    /// in scope of `body`
131    ///
132    /// Similar to this rust code snippet:
133    /// ```rust, ignore
134    /// {
135    ///    let id = value;
136    ///     body
137    /// }
138    Let {
139        id: LocalId,
140        value: Box<TypedPlan>,
141        body: Box<TypedPlan>,
142    },
143    /// Map, Filter, and Project operators. Chained together.
144    Mfp {
145        /// The input collection.
146        input: Box<TypedPlan>,
147        /// Linear operator to apply to each record.
148        mfp: MapFilterProject,
149    },
150    /// Reduce operator, aggregation by key assembled from KeyValPlan
151    Reduce {
152        /// The input collection.
153        input: Box<TypedPlan>,
154        /// A plan for changing input records into key, value pairs.
155        key_val_plan: KeyValPlan,
156        /// A plan for performing the reduce.
157        ///
158        /// The implementation of reduction has several different strategies based
159        /// on the properties of the reduction, and the input itself.
160        reduce_plan: ReducePlan,
161    },
162    /// A multiway relational equijoin, with fused map, filter, and projection.
163    ///
164    /// This stage performs a multiway join among `inputs`, using the equality
165    /// constraints expressed in `plan`. The plan also describes the implementation
166    /// strategy we will use, and any pushed down per-record work.
167    Join {
168        /// An ordered list of inputs that will be joined.
169        inputs: Vec<TypedPlan>,
170        /// Detailed information about the implementation of the join.
171        ///
172        /// This includes information about the implementation strategy, but also
173        /// any map, filter, project work that we might follow the join with, but
174        /// potentially pushed down into the implementation of the join.
175        plan: JoinPlan,
176    },
177    /// Adds the contents of the input collections.
178    ///
179    /// Importantly, this is *multiset* union, so the multiplicities of records will
180    /// add. This is in contrast to *set* union, where the multiplicities would be
181    /// capped at one. A set union can be formed with `Union` followed by `Reduce`
182    /// implementing the "distinct" operator.
183    Union {
184        /// The input collections
185        inputs: Vec<TypedPlan>,
186        /// Whether to consolidate the output, e.g., cancel negated records.
187        consolidate_output: bool,
188    },
189}
190
191impl Plan {
192    pub fn with_types(self, schema: RelationDesc) -> TypedPlan {
193        TypedPlan { schema, plan: self }
194    }
195}
196
197impl Plan {
198    /// Get nth expr using column ref
199    pub fn get_nth_expr(&self, n: usize) -> Option<ScalarExpr> {
200        match self {
201            Self::Mfp { mfp, .. } => mfp.get_nth_expr(n),
202            Self::Reduce { key_val_plan, .. } => key_val_plan.get_nth_expr(n),
203            _ => None,
204        }
205    }
206
207    /// Get the first input plan if exists
208    pub fn get_first_input_plan(&self) -> Option<&TypedPlan> {
209        match self {
210            Plan::Let { value, .. } => Some(value),
211            Plan::Mfp { input, .. } => Some(input),
212            Plan::Reduce { input, .. } => Some(input),
213            Plan::Join { inputs, .. } => inputs.first(),
214            Plan::Union { inputs, .. } => inputs.first(),
215            _ => None,
216        }
217    }
218
219    /// Get mutable ref to the first input plan if exists
220    pub fn get_mut_first_input_plan(&mut self) -> Option<&mut TypedPlan> {
221        match self {
222            Plan::Let { value, .. } => Some(value),
223            Plan::Mfp { input, .. } => Some(input),
224            Plan::Reduce { input, .. } => Some(input),
225            Plan::Join { inputs, .. } => inputs.first_mut(),
226            Plan::Union { inputs, .. } => inputs.first_mut(),
227            _ => None,
228        }
229    }
230
231    /// Find all the used collection in the plan
232    pub fn find_used_collection(&self) -> BTreeSet<GlobalId> {
233        fn recur_find_use(plan: &Plan, used: &mut BTreeSet<GlobalId>) {
234            match plan {
235                Plan::Get { id } => {
236                    match id {
237                        Id::Local(_) => (),
238                        Id::Global(g) => {
239                            used.insert(*g);
240                        }
241                    };
242                }
243                Plan::Let { value, body, .. } => {
244                    recur_find_use(&value.plan, used);
245                    recur_find_use(&body.plan, used);
246                }
247                Plan::Mfp { input, .. } => {
248                    recur_find_use(&input.plan, used);
249                }
250                Plan::Reduce { input, .. } => {
251                    recur_find_use(&input.plan, used);
252                }
253                Plan::Join { inputs, .. } => {
254                    for input in inputs {
255                        recur_find_use(&input.plan, used);
256                    }
257                }
258                Plan::Union { inputs, .. } => {
259                    for input in inputs {
260                        recur_find_use(&input.plan, used);
261                    }
262                }
263                _ => {}
264            }
265        }
266        let mut ret = Default::default();
267        recur_find_use(self, &mut ret);
268        ret
269    }
270}