flow/transform/
plan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use itertools::Itertools;
18use snafu::OptionExt;
19use substrait::substrait_proto_df::proto::{FilterRel, ReadRel};
20use substrait_proto::proto::expression::MaskExpression;
21use substrait_proto::proto::read_rel::ReadType;
22use substrait_proto::proto::rel::RelType;
23use substrait_proto::proto::{plan_rel, Plan as SubPlan, ProjectRel, Rel};
24
25use crate::error::{Error, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu};
26use crate::expr::{MapFilterProject, TypedExpr};
27use crate::plan::{Plan, TypedPlan};
28use crate::repr::{self, RelationType};
29use crate::transform::{substrait_proto, FlownodeContext, FunctionExtensions};
30
31impl TypedPlan {
32    /// Convert Substrait Plan into Flow's TypedPlan
33    pub async fn from_substrait_plan(
34        ctx: &mut FlownodeContext,
35        plan: &SubPlan,
36    ) -> Result<TypedPlan, Error> {
37        // Register function extension
38        let function_extension = FunctionExtensions::try_from_proto(&plan.extensions)?;
39
40        // Parse relations
41        match plan.relations.len() {
42        1 => {
43            match plan.relations[0].rel_type.as_ref() {
44                Some(rt) => match rt {
45                    plan_rel::RelType::Rel(rel) => {
46                        Ok(TypedPlan::from_substrait_rel(ctx, rel, &function_extension).await?)
47                    },
48                    plan_rel::RelType::Root(root) => {
49                        let input = root.input.as_ref().with_context(|| InvalidQuerySnafu {
50                            reason: "Root relation without input",
51                        })?;
52
53                        let mut ret = TypedPlan::from_substrait_rel(ctx, input, &function_extension).await?;
54
55                        if !root.names.is_empty() {
56                            ret.schema = ret.schema.clone().try_with_names(root.names.clone())?;
57                        }
58
59                        Ok(ret)
60                    }
61                },
62                None => plan_err!("Cannot parse plan relation: None")
63            }
64        },
65        _ => not_impl_err!(
66            "Substrait plan with more than 1 relation trees not supported. Number of relation trees: {:?}",
67            plan.relations.len()
68        )
69    }
70    }
71
72    #[async_recursion::async_recursion]
73    pub async fn from_substrait_project(
74        ctx: &mut FlownodeContext,
75        p: &ProjectRel,
76        extensions: &FunctionExtensions,
77    ) -> Result<TypedPlan, Error> {
78        let input = if let Some(input) = p.input.as_ref() {
79            TypedPlan::from_substrait_rel(ctx, input, extensions).await?
80        } else {
81            return not_impl_err!("Projection without an input is not supported");
82        };
83
84        // because this `input.schema` is incorrect for pre-expand substrait plan, so we have to use schema before expand multi-value
85        // function to correctly transform it, and late rewrite it
86        // TODO(discord9): this logic is obsoleted since now expand happens in datafusion optimizer
87        let schema_before_expand = {
88            let input_schema = input.schema.clone();
89            let auto_columns: HashSet<usize> =
90                HashSet::from_iter(input_schema.typ().auto_columns.clone());
91            let not_auto_added_columns = (0..input_schema.len()?)
92                .filter(|i| !auto_columns.contains(i))
93                .collect_vec();
94            let mfp = MapFilterProject::new(input_schema.len()?)
95                .project(not_auto_added_columns)?
96                .into_safe();
97
98            input_schema.apply_mfp(&mfp)?
99        };
100
101        let mut exprs: Vec<TypedExpr> = Vec::with_capacity(p.expressions.len());
102        for e in &p.expressions {
103            let expr = TypedExpr::from_substrait_rex(e, &schema_before_expand, extensions).await?;
104            exprs.push(expr);
105        }
106        let is_literal = exprs.iter().all(|expr| expr.expr.is_literal());
107        if is_literal {
108            let (literals, lit_types): (Vec<_>, Vec<_>) = exprs
109                .into_iter()
110                .map(|TypedExpr { expr, typ }| (expr, typ))
111                .unzip();
112            let typ = RelationType::new(lit_types);
113            let row = literals
114                .into_iter()
115                .map(|lit| lit.as_literal().expect("A literal"))
116                .collect_vec();
117            let row = repr::Row::new(row);
118            let plan = Plan::Constant {
119                rows: vec![(row, repr::Timestamp::MIN, 1)],
120            };
121            Ok(TypedPlan {
122                schema: typ.into_unnamed(),
123                plan,
124            })
125        } else {
126            input.projection(exprs)
127        }
128    }
129
130    #[async_recursion::async_recursion]
131    pub async fn from_substrait_filter(
132        ctx: &mut FlownodeContext,
133        filter: &FilterRel,
134        extensions: &FunctionExtensions,
135    ) -> Result<TypedPlan, Error> {
136        let input = if let Some(input) = filter.input.as_ref() {
137            TypedPlan::from_substrait_rel(ctx, input, extensions).await?
138        } else {
139            return not_impl_err!("Filter without an input is not supported");
140        };
141
142        let expr = if let Some(condition) = filter.condition.as_ref() {
143            TypedExpr::from_substrait_rex(condition, &input.schema, extensions).await?
144        } else {
145            return not_impl_err!("Filter without an condition is not valid");
146        };
147        input.filter(expr)
148    }
149
150    pub async fn from_substrait_read(
151        ctx: &mut FlownodeContext,
152        read: &ReadRel,
153        _extensions: &FunctionExtensions,
154    ) -> Result<TypedPlan, Error> {
155        if let Some(ReadType::NamedTable(nt)) = &read.read_type {
156            let query_ctx = ctx.query_context.clone().context(UnexpectedSnafu {
157                reason: "Query context not found",
158            })?;
159            let table_reference = match nt.names.len() {
160                1 => [
161                    query_ctx.current_catalog().to_string(),
162                    query_ctx.current_schema().to_string(),
163                    nt.names[0].clone(),
164                ],
165                2 => [
166                    query_ctx.current_catalog().to_string(),
167                    nt.names[0].clone(),
168                    nt.names[1].clone(),
169                ],
170                3 => [
171                    nt.names[0].clone(),
172                    nt.names[1].clone(),
173                    nt.names[2].clone(),
174                ],
175                _ => InvalidQuerySnafu {
176                    reason: "Expect table to have name",
177                }
178                .fail()?,
179            };
180
181            let table = ctx.table(&table_reference).await?;
182            let get_table = Plan::Get {
183                id: crate::expr::Id::Global(table.0),
184            };
185            let get_table = TypedPlan {
186                schema: table.1,
187                plan: get_table,
188            };
189
190            if let Some(MaskExpression {
191                select: Some(projection),
192                ..
193            }) = &read.projection
194            {
195                let column_indices: Vec<usize> = projection
196                    .struct_items
197                    .iter()
198                    .map(|item| item.field as usize)
199                    .collect();
200                let input_arity = get_table.schema.typ().column_types.len();
201                let mfp = MapFilterProject::new(input_arity).project(column_indices.clone())?;
202                get_table.mfp(mfp.into_safe())
203            } else {
204                Ok(get_table)
205            }
206        } else {
207            not_impl_err!("Only NamedTable reads are supported")
208        }
209    }
210
211    /// Convert Substrait Rel into Flow's TypedPlan
212    /// TODO(discord9): SELECT DISTINCT(does it get compile with something else?)
213    pub async fn from_substrait_rel(
214        ctx: &mut FlownodeContext,
215        rel: &Rel,
216        extensions: &FunctionExtensions,
217    ) -> Result<TypedPlan, Error> {
218        match &rel.rel_type {
219            Some(RelType::Project(p)) => {
220                Self::from_substrait_project(ctx, p.as_ref(), extensions).await
221            }
222            Some(RelType::Filter(filter)) => {
223                Self::from_substrait_filter(ctx, filter, extensions).await
224            }
225            Some(RelType::Read(read)) => Self::from_substrait_read(ctx, read, extensions).await,
226            Some(RelType::Aggregate(agg)) => {
227                Self::from_substrait_agg_rel(ctx, agg, extensions).await
228            }
229            _ => not_impl_err!("Unsupported relation type: {:?}", rel.rel_type),
230        }
231    }
232}
233
234#[cfg(test)]
235mod test {
236    use datatypes::prelude::ConcreteDataType;
237    use pretty_assertions::assert_eq;
238
239    use super::*;
240    use crate::expr::GlobalId;
241    use crate::plan::{Plan, TypedPlan};
242    use crate::repr::{ColumnType, RelationType};
243    use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait};
244    use crate::transform::CDT;
245
246    #[tokio::test]
247    async fn test_select() {
248        let engine = create_test_query_engine();
249        let sql = "SELECT number FROM numbers";
250        let plan = sql_to_substrait(engine.clone(), sql).await;
251
252        let mut ctx = create_test_ctx();
253        let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).await;
254
255        let expected = TypedPlan {
256            schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
257                .into_named(vec![Some("number".to_string())]),
258            plan: Plan::Mfp {
259                input: Box::new(
260                    Plan::Get {
261                        id: crate::expr::Id::Global(GlobalId::User(0)),
262                    }
263                    .with_types(
264                        RelationType::new(vec![ColumnType::new(
265                            ConcreteDataType::uint32_datatype(),
266                            false,
267                        )])
268                        .into_named(vec![Some("number".to_string())]),
269                    ),
270                ),
271                mfp: MapFilterProject::new(1),
272            },
273        };
274
275        assert_eq!(flow_plan.unwrap(), expected);
276    }
277}