1use std::collections::HashSet;
16
17use itertools::Itertools;
18use snafu::OptionExt;
19use substrait::substrait_proto_df::proto::{FilterRel, ReadRel};
20use substrait_proto::proto::expression::MaskExpression;
21use substrait_proto::proto::read_rel::ReadType;
22use substrait_proto::proto::rel::RelType;
23use substrait_proto::proto::{plan_rel, Plan as SubPlan, ProjectRel, Rel};
24
25use crate::error::{Error, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu};
26use crate::expr::{MapFilterProject, TypedExpr};
27use crate::plan::{Plan, TypedPlan};
28use crate::repr::{self, RelationType};
29use crate::transform::{substrait_proto, FlownodeContext, FunctionExtensions};
30
31impl TypedPlan {
32 pub async fn from_substrait_plan(
34 ctx: &mut FlownodeContext,
35 plan: &SubPlan,
36 ) -> Result<TypedPlan, Error> {
37 let function_extension = FunctionExtensions::try_from_proto(&plan.extensions)?;
39
40 match plan.relations.len() {
42 1 => {
43 match plan.relations[0].rel_type.as_ref() {
44 Some(rt) => match rt {
45 plan_rel::RelType::Rel(rel) => {
46 Ok(TypedPlan::from_substrait_rel(ctx, rel, &function_extension).await?)
47 },
48 plan_rel::RelType::Root(root) => {
49 let input = root.input.as_ref().with_context(|| InvalidQuerySnafu {
50 reason: "Root relation without input",
51 })?;
52
53 let mut ret = TypedPlan::from_substrait_rel(ctx, input, &function_extension).await?;
54
55 if !root.names.is_empty() {
56 ret.schema = ret.schema.clone().try_with_names(root.names.clone())?;
57 }
58
59 Ok(ret)
60 }
61 },
62 None => plan_err!("Cannot parse plan relation: None")
63 }
64 },
65 _ => not_impl_err!(
66 "Substrait plan with more than 1 relation trees not supported. Number of relation trees: {:?}",
67 plan.relations.len()
68 )
69 }
70 }
71
72 #[async_recursion::async_recursion]
73 pub async fn from_substrait_project(
74 ctx: &mut FlownodeContext,
75 p: &ProjectRel,
76 extensions: &FunctionExtensions,
77 ) -> Result<TypedPlan, Error> {
78 let input = if let Some(input) = p.input.as_ref() {
79 TypedPlan::from_substrait_rel(ctx, input, extensions).await?
80 } else {
81 return not_impl_err!("Projection without an input is not supported");
82 };
83
84 let schema_before_expand = {
88 let input_schema = input.schema.clone();
89 let auto_columns: HashSet<usize> =
90 HashSet::from_iter(input_schema.typ().auto_columns.clone());
91 let not_auto_added_columns = (0..input_schema.len()?)
92 .filter(|i| !auto_columns.contains(i))
93 .collect_vec();
94 let mfp = MapFilterProject::new(input_schema.len()?)
95 .project(not_auto_added_columns)?
96 .into_safe();
97
98 input_schema.apply_mfp(&mfp)?
99 };
100
101 let mut exprs: Vec<TypedExpr> = Vec::with_capacity(p.expressions.len());
102 for e in &p.expressions {
103 let expr = TypedExpr::from_substrait_rex(e, &schema_before_expand, extensions).await?;
104 exprs.push(expr);
105 }
106 let is_literal = exprs.iter().all(|expr| expr.expr.is_literal());
107 if is_literal {
108 let (literals, lit_types): (Vec<_>, Vec<_>) = exprs
109 .into_iter()
110 .map(|TypedExpr { expr, typ }| (expr, typ))
111 .unzip();
112 let typ = RelationType::new(lit_types);
113 let row = literals
114 .into_iter()
115 .map(|lit| lit.as_literal().expect("A literal"))
116 .collect_vec();
117 let row = repr::Row::new(row);
118 let plan = Plan::Constant {
119 rows: vec![(row, repr::Timestamp::MIN, 1)],
120 };
121 Ok(TypedPlan {
122 schema: typ.into_unnamed(),
123 plan,
124 })
125 } else {
126 input.projection(exprs)
127 }
128 }
129
130 #[async_recursion::async_recursion]
131 pub async fn from_substrait_filter(
132 ctx: &mut FlownodeContext,
133 filter: &FilterRel,
134 extensions: &FunctionExtensions,
135 ) -> Result<TypedPlan, Error> {
136 let input = if let Some(input) = filter.input.as_ref() {
137 TypedPlan::from_substrait_rel(ctx, input, extensions).await?
138 } else {
139 return not_impl_err!("Filter without an input is not supported");
140 };
141
142 let expr = if let Some(condition) = filter.condition.as_ref() {
143 TypedExpr::from_substrait_rex(condition, &input.schema, extensions).await?
144 } else {
145 return not_impl_err!("Filter without an condition is not valid");
146 };
147 input.filter(expr)
148 }
149
150 pub async fn from_substrait_read(
151 ctx: &mut FlownodeContext,
152 read: &ReadRel,
153 _extensions: &FunctionExtensions,
154 ) -> Result<TypedPlan, Error> {
155 if let Some(ReadType::NamedTable(nt)) = &read.read_type {
156 let query_ctx = ctx.query_context.clone().context(UnexpectedSnafu {
157 reason: "Query context not found",
158 })?;
159 let table_reference = match nt.names.len() {
160 1 => [
161 query_ctx.current_catalog().to_string(),
162 query_ctx.current_schema().to_string(),
163 nt.names[0].clone(),
164 ],
165 2 => [
166 query_ctx.current_catalog().to_string(),
167 nt.names[0].clone(),
168 nt.names[1].clone(),
169 ],
170 3 => [
171 nt.names[0].clone(),
172 nt.names[1].clone(),
173 nt.names[2].clone(),
174 ],
175 _ => InvalidQuerySnafu {
176 reason: "Expect table to have name",
177 }
178 .fail()?,
179 };
180
181 let table = ctx.table(&table_reference).await?;
182 let get_table = Plan::Get {
183 id: crate::expr::Id::Global(table.0),
184 };
185 let get_table = TypedPlan {
186 schema: table.1,
187 plan: get_table,
188 };
189
190 if let Some(MaskExpression {
191 select: Some(projection),
192 ..
193 }) = &read.projection
194 {
195 let column_indices: Vec<usize> = projection
196 .struct_items
197 .iter()
198 .map(|item| item.field as usize)
199 .collect();
200 let input_arity = get_table.schema.typ().column_types.len();
201 let mfp = MapFilterProject::new(input_arity).project(column_indices.clone())?;
202 get_table.mfp(mfp.into_safe())
203 } else {
204 Ok(get_table)
205 }
206 } else {
207 not_impl_err!("Only NamedTable reads are supported")
208 }
209 }
210
211 pub async fn from_substrait_rel(
214 ctx: &mut FlownodeContext,
215 rel: &Rel,
216 extensions: &FunctionExtensions,
217 ) -> Result<TypedPlan, Error> {
218 match &rel.rel_type {
219 Some(RelType::Project(p)) => {
220 Self::from_substrait_project(ctx, p.as_ref(), extensions).await
221 }
222 Some(RelType::Filter(filter)) => {
223 Self::from_substrait_filter(ctx, filter, extensions).await
224 }
225 Some(RelType::Read(read)) => Self::from_substrait_read(ctx, read, extensions).await,
226 Some(RelType::Aggregate(agg)) => {
227 Self::from_substrait_agg_rel(ctx, agg, extensions).await
228 }
229 _ => not_impl_err!("Unsupported relation type: {:?}", rel.rel_type),
230 }
231 }
232}
233
234#[cfg(test)]
235mod test {
236 use datatypes::prelude::ConcreteDataType;
237 use pretty_assertions::assert_eq;
238
239 use super::*;
240 use crate::expr::GlobalId;
241 use crate::plan::{Plan, TypedPlan};
242 use crate::repr::{ColumnType, RelationType};
243 use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait};
244 use crate::transform::CDT;
245
246 #[tokio::test]
247 async fn test_select() {
248 let engine = create_test_query_engine();
249 let sql = "SELECT number FROM numbers";
250 let plan = sql_to_substrait(engine.clone(), sql).await;
251
252 let mut ctx = create_test_ctx();
253 let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).await;
254
255 let expected = TypedPlan {
256 schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
257 .into_named(vec![Some("number".to_string())]),
258 plan: Plan::Mfp {
259 input: Box::new(
260 Plan::Get {
261 id: crate::expr::Id::Global(GlobalId::User(0)),
262 }
263 .with_types(
264 RelationType::new(vec![ColumnType::new(
265 ConcreteDataType::uint32_datatype(),
266 false,
267 )])
268 .into_named(vec![Some("number".to_string())]),
269 ),
270 ),
271 mfp: MapFilterProject::new(1),
272 },
273 };
274
275 assert_eq!(flow_plan.unwrap(), expected);
276 }
277}