flow/transform/
plan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use itertools::Itertools;
18use snafu::OptionExt;
19use substrait::substrait_proto_df::proto::{FilterRel, ReadRel};
20use substrait_proto::proto::expression::MaskExpression;
21use substrait_proto::proto::read_rel::ReadType;
22use substrait_proto::proto::rel::RelType;
23use substrait_proto::proto::{Plan as SubPlan, ProjectRel, Rel, plan_rel};
24
25use crate::error::{Error, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu};
26use crate::expr::{MapFilterProject, TypedExpr};
27use crate::plan::{Plan, TypedPlan};
28use crate::repr::{self, RelationType};
29use crate::transform::{FlownodeContext, FunctionExtensions, substrait_proto};
30
31impl TypedPlan {
32    /// Convert Substrait Plan into Flow's TypedPlan
33    pub async fn from_substrait_plan(
34        ctx: &mut FlownodeContext,
35        plan: &SubPlan,
36    ) -> Result<TypedPlan, Error> {
37        // Register function extension
38        let function_extension = FunctionExtensions::try_from_proto(&plan.extensions)?;
39
40        // Parse relations
41        match plan.relations.len() {
42            1 => match plan.relations[0].rel_type.as_ref() {
43                Some(rt) => match rt {
44                    plan_rel::RelType::Rel(rel) => {
45                        Ok(TypedPlan::from_substrait_rel(ctx, rel, &function_extension).await?)
46                    }
47                    plan_rel::RelType::Root(root) => {
48                        let input = root.input.as_ref().with_context(|| InvalidQuerySnafu {
49                            reason: "Root relation without input",
50                        })?;
51
52                        let mut ret =
53                            TypedPlan::from_substrait_rel(ctx, input, &function_extension).await?;
54
55                        if !root.names.is_empty() {
56                            ret.schema = ret.schema.clone().try_with_names(root.names.clone())?;
57                        }
58
59                        Ok(ret)
60                    }
61                },
62                None => plan_err!("Cannot parse plan relation: None"),
63            },
64            _ => not_impl_err!(
65                "Substrait plan with more than 1 relation trees not supported. Number of relation trees: {:?}",
66                plan.relations.len()
67            ),
68        }
69    }
70
71    #[async_recursion::async_recursion]
72    pub async fn from_substrait_project(
73        ctx: &mut FlownodeContext,
74        p: &ProjectRel,
75        extensions: &FunctionExtensions,
76    ) -> Result<TypedPlan, Error> {
77        let input = if let Some(input) = p.input.as_ref() {
78            TypedPlan::from_substrait_rel(ctx, input, extensions).await?
79        } else {
80            return not_impl_err!("Projection without an input is not supported");
81        };
82
83        // because this `input.schema` is incorrect for pre-expand substrait plan, so we have to use schema before expand multi-value
84        // function to correctly transform it, and late rewrite it
85        // TODO(discord9): this logic is obsoleted since now expand happens in datafusion optimizer
86        let schema_before_expand = {
87            let input_schema = input.schema.clone();
88            let auto_columns: HashSet<usize> =
89                HashSet::from_iter(input_schema.typ().auto_columns.clone());
90            let not_auto_added_columns = (0..input_schema.len()?)
91                .filter(|i| !auto_columns.contains(i))
92                .collect_vec();
93            let mfp = MapFilterProject::new(input_schema.len()?)
94                .project(not_auto_added_columns)?
95                .into_safe();
96
97            input_schema.apply_mfp(&mfp)?
98        };
99
100        let mut exprs: Vec<TypedExpr> = Vec::with_capacity(p.expressions.len());
101        for e in &p.expressions {
102            let expr = TypedExpr::from_substrait_rex(e, &schema_before_expand, extensions).await?;
103            exprs.push(expr);
104        }
105        let is_literal = exprs.iter().all(|expr| expr.expr.is_literal());
106        if is_literal {
107            let (literals, lit_types): (Vec<_>, Vec<_>) = exprs
108                .into_iter()
109                .map(|TypedExpr { expr, typ }| (expr, typ))
110                .unzip();
111            let typ = RelationType::new(lit_types);
112            let row = literals
113                .into_iter()
114                .map(|lit| lit.as_literal().expect("A literal"))
115                .collect_vec();
116            let row = repr::Row::new(row);
117            let plan = Plan::Constant {
118                rows: vec![(row, repr::Timestamp::MIN, 1)],
119            };
120            Ok(TypedPlan {
121                schema: typ.into_unnamed(),
122                plan,
123            })
124        } else {
125            input.projection(exprs)
126        }
127    }
128
129    #[async_recursion::async_recursion]
130    pub async fn from_substrait_filter(
131        ctx: &mut FlownodeContext,
132        filter: &FilterRel,
133        extensions: &FunctionExtensions,
134    ) -> Result<TypedPlan, Error> {
135        let input = if let Some(input) = filter.input.as_ref() {
136            TypedPlan::from_substrait_rel(ctx, input, extensions).await?
137        } else {
138            return not_impl_err!("Filter without an input is not supported");
139        };
140
141        let expr = if let Some(condition) = filter.condition.as_ref() {
142            TypedExpr::from_substrait_rex(condition, &input.schema, extensions).await?
143        } else {
144            return not_impl_err!("Filter without an condition is not valid");
145        };
146        input.filter(expr)
147    }
148
149    pub async fn from_substrait_read(
150        ctx: &mut FlownodeContext,
151        read: &ReadRel,
152        _extensions: &FunctionExtensions,
153    ) -> Result<TypedPlan, Error> {
154        if let Some(ReadType::NamedTable(nt)) = &read.read_type {
155            let query_ctx = ctx.query_context.clone().context(UnexpectedSnafu {
156                reason: "Query context not found",
157            })?;
158            let table_reference = match nt.names.len() {
159                1 => [
160                    query_ctx.current_catalog().to_string(),
161                    query_ctx.current_schema().to_string(),
162                    nt.names[0].clone(),
163                ],
164                2 => [
165                    query_ctx.current_catalog().to_string(),
166                    nt.names[0].clone(),
167                    nt.names[1].clone(),
168                ],
169                3 => [
170                    nt.names[0].clone(),
171                    nt.names[1].clone(),
172                    nt.names[2].clone(),
173                ],
174                _ => InvalidQuerySnafu {
175                    reason: "Expect table to have name",
176                }
177                .fail()?,
178            };
179
180            let table = ctx.table(&table_reference).await?;
181            let get_table = Plan::Get {
182                id: crate::expr::Id::Global(table.0),
183            };
184            let get_table = TypedPlan {
185                schema: table.1,
186                plan: get_table,
187            };
188
189            if let Some(MaskExpression {
190                select: Some(projection),
191                ..
192            }) = &read.projection
193            {
194                let column_indices: Vec<usize> = projection
195                    .struct_items
196                    .iter()
197                    .map(|item| item.field as usize)
198                    .collect();
199                let input_arity = get_table.schema.typ().column_types.len();
200                let mfp = MapFilterProject::new(input_arity).project(column_indices.clone())?;
201                get_table.mfp(mfp.into_safe())
202            } else {
203                Ok(get_table)
204            }
205        } else {
206            not_impl_err!("Only NamedTable reads are supported")
207        }
208    }
209
210    /// Convert Substrait Rel into Flow's TypedPlan
211    /// TODO(discord9): SELECT DISTINCT(does it get compile with something else?)
212    pub async fn from_substrait_rel(
213        ctx: &mut FlownodeContext,
214        rel: &Rel,
215        extensions: &FunctionExtensions,
216    ) -> Result<TypedPlan, Error> {
217        match &rel.rel_type {
218            Some(RelType::Project(p)) => {
219                Self::from_substrait_project(ctx, p.as_ref(), extensions).await
220            }
221            Some(RelType::Filter(filter)) => {
222                Self::from_substrait_filter(ctx, filter, extensions).await
223            }
224            Some(RelType::Read(read)) => Self::from_substrait_read(ctx, read, extensions).await,
225            Some(RelType::Aggregate(agg)) => {
226                Self::from_substrait_agg_rel(ctx, agg, extensions).await
227            }
228            _ => not_impl_err!("Unsupported relation type: {:?}", rel.rel_type),
229        }
230    }
231}
232
233#[cfg(test)]
234mod test {
235    use datatypes::data_type::ConcreteDataType as CDT;
236    use datatypes::prelude::ConcreteDataType;
237    use pretty_assertions::assert_eq;
238
239    use super::*;
240    use crate::expr::GlobalId;
241    use crate::plan::{Plan, TypedPlan};
242    use crate::repr::{ColumnType, RelationType};
243    use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait};
244
245    #[tokio::test]
246    async fn test_select() {
247        let engine = create_test_query_engine();
248        let sql = "SELECT number FROM numbers";
249        let plan = sql_to_substrait(engine.clone(), sql).await;
250
251        let mut ctx = create_test_ctx();
252        let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).await;
253
254        let expected = TypedPlan {
255            schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
256                .into_named(vec![Some("number".to_string())]),
257            plan: Plan::Mfp {
258                input: Box::new(
259                    Plan::Get {
260                        id: crate::expr::Id::Global(GlobalId::User(0)),
261                    }
262                    .with_types(
263                        RelationType::new(vec![ColumnType::new(
264                            ConcreteDataType::uint32_datatype(),
265                            false,
266                        )])
267                        .into_named(vec![Some("number".to_string())]),
268                    ),
269                ),
270                mfp: MapFilterProject::new(1),
271            },
272        };
273
274        assert_eq!(flow_plan.unwrap(), expected);
275    }
276}