flow/plan.rs
1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module contain basic definition for dataflow's plan
16//! that can be translate to hydro dataflow
17
18mod join;
19mod reduce;
20
21use std::collections::BTreeSet;
22
23use crate::error::Error;
24use crate::expr::{GlobalId, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr, TypedExpr};
25use crate::plan::join::JoinPlan;
26pub(crate) use crate::plan::reduce::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan};
27use crate::repr::{DiffRow, RelationDesc};
28
29/// A plan for a dataflow component. But with type to indicate the output type of the relation.
30#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
31pub struct TypedPlan {
32 /// output type of the relation
33 pub schema: RelationDesc,
34 /// The untyped plan.
35 pub plan: Plan,
36}
37
38impl TypedPlan {
39 /// directly apply a mfp to the plan
40 pub fn mfp(self, mfp: SafeMfpPlan) -> Result<Self, Error> {
41 let new_type = self.schema.apply_mfp(&mfp)?;
42 let mfp = mfp.mfp;
43 let plan = match self.plan {
44 Plan::Mfp {
45 input,
46 mfp: old_mfp,
47 } => Plan::Mfp {
48 input,
49 mfp: MapFilterProject::compose(old_mfp, mfp)?,
50 },
51 _ => Plan::Mfp {
52 input: Box::new(self),
53 mfp,
54 },
55 };
56 Ok(TypedPlan {
57 schema: new_type,
58 plan,
59 })
60 }
61
62 /// project the plan to the given expressions
63 pub fn projection(self, exprs: Vec<TypedExpr>) -> Result<Self, Error> {
64 let input_arity = self.schema.typ.column_types.len();
65 let output_arity = exprs.len();
66
67 let (exprs, _expr_typs): (Vec<_>, Vec<_>) = exprs
68 .into_iter()
69 .map(|TypedExpr { expr, typ }| (expr, typ))
70 .unzip();
71 let mfp = MapFilterProject::new(input_arity)
72 .map(exprs)?
73 .project(input_arity..input_arity + output_arity)?
74 .into_safe();
75 let out_typ = self.schema.apply_mfp(&mfp)?;
76
77 let mfp = mfp.mfp;
78 // special case for mfp to compose when the plan is already mfp
79 let plan = match self.plan {
80 Plan::Mfp {
81 input,
82 mfp: old_mfp,
83 } => Plan::Mfp {
84 input,
85 mfp: MapFilterProject::compose(old_mfp, mfp)?,
86 },
87 _ => Plan::Mfp {
88 input: Box::new(self),
89 mfp,
90 },
91 };
92 Ok(TypedPlan {
93 schema: out_typ,
94 plan,
95 })
96 }
97
98 /// Add a new filter to the plan, will filter out the records that do not satisfy the filter
99 pub fn filter(self, filter: TypedExpr) -> Result<Self, Error> {
100 let typ = self.schema.clone();
101 let plan = match self.plan {
102 Plan::Mfp {
103 input,
104 mfp: old_mfp,
105 } => Plan::Mfp {
106 input,
107 mfp: old_mfp.filter(vec![filter.expr])?,
108 },
109 _ => Plan::Mfp {
110 input: Box::new(self),
111 mfp: MapFilterProject::new(typ.typ.column_types.len()).filter(vec![filter.expr])?,
112 },
113 };
114 Ok(TypedPlan { schema: typ, plan })
115 }
116}
117
118/// TODO(discord9): support `TableFunc`(by define FlatMap that map 1 to n)
119/// Plan describe how to transform data in dataflow
120///
121/// This can be considered as a physical plan in dataflow, which describe how to transform data in a streaming manner.
122#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
123pub enum Plan {
124 /// A constant collection of rows.
125 Constant { rows: Vec<DiffRow> },
126 /// Get CDC data from an source, be it external reference to an existing source or an internal
127 /// reference to a `Let` identifier
128 Get { id: Id },
129 /// Create a temporary collection from given `value`, and make this bind only available
130 /// in scope of `body`
131 ///
132 /// Similar to this rust code snippet:
133 /// ```rust, ignore
134 /// {
135 /// let id = value;
136 /// body
137 /// }
138 Let {
139 id: LocalId,
140 value: Box<TypedPlan>,
141 body: Box<TypedPlan>,
142 },
143 /// Map, Filter, and Project operators. Chained together.
144 Mfp {
145 /// The input collection.
146 input: Box<TypedPlan>,
147 /// Linear operator to apply to each record.
148 mfp: MapFilterProject,
149 },
150 /// Reduce operator, aggregation by key assembled from KeyValPlan
151 Reduce {
152 /// The input collection.
153 input: Box<TypedPlan>,
154 /// A plan for changing input records into key, value pairs.
155 key_val_plan: KeyValPlan,
156 /// A plan for performing the reduce.
157 ///
158 /// The implementation of reduction has several different strategies based
159 /// on the properties of the reduction, and the input itself.
160 reduce_plan: ReducePlan,
161 },
162 /// A multiway relational equijoin, with fused map, filter, and projection.
163 ///
164 /// This stage performs a multiway join among `inputs`, using the equality
165 /// constraints expressed in `plan`. The plan also describes the implementation
166 /// strategy we will use, and any pushed down per-record work.
167 Join {
168 /// An ordered list of inputs that will be joined.
169 inputs: Vec<TypedPlan>,
170 /// Detailed information about the implementation of the join.
171 ///
172 /// This includes information about the implementation strategy, but also
173 /// any map, filter, project work that we might follow the join with, but
174 /// potentially pushed down into the implementation of the join.
175 plan: JoinPlan,
176 },
177 /// Adds the contents of the input collections.
178 ///
179 /// Importantly, this is *multiset* union, so the multiplicities of records will
180 /// add. This is in contrast to *set* union, where the multiplicities would be
181 /// capped at one. A set union can be formed with `Union` followed by `Reduce`
182 /// implementing the "distinct" operator.
183 Union {
184 /// The input collections
185 inputs: Vec<TypedPlan>,
186 /// Whether to consolidate the output, e.g., cancel negated records.
187 consolidate_output: bool,
188 },
189}
190
191impl Plan {
192 pub fn with_types(self, schema: RelationDesc) -> TypedPlan {
193 TypedPlan { schema, plan: self }
194 }
195}
196
197impl Plan {
198 /// Get nth expr using column ref
199 pub fn get_nth_expr(&self, n: usize) -> Option<ScalarExpr> {
200 match self {
201 Self::Mfp { mfp, .. } => mfp.get_nth_expr(n),
202 Self::Reduce { key_val_plan, .. } => key_val_plan.get_nth_expr(n),
203 _ => None,
204 }
205 }
206
207 /// Get the first input plan if exists
208 pub fn get_first_input_plan(&self) -> Option<&TypedPlan> {
209 match self {
210 Plan::Let { value, .. } => Some(value),
211 Plan::Mfp { input, .. } => Some(input),
212 Plan::Reduce { input, .. } => Some(input),
213 Plan::Join { inputs, .. } => inputs.first(),
214 Plan::Union { inputs, .. } => inputs.first(),
215 _ => None,
216 }
217 }
218
219 /// Get mutable ref to the first input plan if exists
220 pub fn get_mut_first_input_plan(&mut self) -> Option<&mut TypedPlan> {
221 match self {
222 Plan::Let { value, .. } => Some(value),
223 Plan::Mfp { input, .. } => Some(input),
224 Plan::Reduce { input, .. } => Some(input),
225 Plan::Join { inputs, .. } => inputs.first_mut(),
226 Plan::Union { inputs, .. } => inputs.first_mut(),
227 _ => None,
228 }
229 }
230
231 /// Find all the used collection in the plan
232 pub fn find_used_collection(&self) -> BTreeSet<GlobalId> {
233 fn recur_find_use(plan: &Plan, used: &mut BTreeSet<GlobalId>) {
234 match plan {
235 Plan::Get { id } => {
236 match id {
237 Id::Local(_) => (),
238 Id::Global(g) => {
239 used.insert(*g);
240 }
241 };
242 }
243 Plan::Let { value, body, .. } => {
244 recur_find_use(&value.plan, used);
245 recur_find_use(&body.plan, used);
246 }
247 Plan::Mfp { input, .. } => {
248 recur_find_use(&input.plan, used);
249 }
250 Plan::Reduce { input, .. } => {
251 recur_find_use(&input.plan, used);
252 }
253 Plan::Join { inputs, .. } => {
254 for input in inputs {
255 recur_find_use(&input.plan, used);
256 }
257 }
258 Plan::Union { inputs, .. } => {
259 for input in inputs {
260 recur_find_use(&input.plan, used);
261 }
262 }
263 _ => {}
264 }
265 }
266 let mut ret = Default::default();
267 recur_find_use(self, &mut ret);
268 ret
269 }
270}