1use std::collections::HashSet;
16
17use itertools::Itertools;
18use snafu::OptionExt;
19use substrait::substrait_proto_df::proto::{FilterRel, ReadRel};
20use substrait_proto::proto::expression::MaskExpression;
21use substrait_proto::proto::read_rel::ReadType;
22use substrait_proto::proto::rel::RelType;
23use substrait_proto::proto::{Plan as SubPlan, ProjectRel, Rel, plan_rel};
24
25use crate::error::{Error, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu};
26use crate::expr::{MapFilterProject, TypedExpr};
27use crate::plan::{Plan, TypedPlan};
28use crate::repr::{self, RelationType};
29use crate::transform::{FlownodeContext, FunctionExtensions, substrait_proto};
30
31impl TypedPlan {
32 pub async fn from_substrait_plan(
34 ctx: &mut FlownodeContext,
35 plan: &SubPlan,
36 ) -> Result<TypedPlan, Error> {
37 let function_extension = FunctionExtensions::try_from_proto(&plan.extensions)?;
39
40 match plan.relations.len() {
42 1 => match plan.relations[0].rel_type.as_ref() {
43 Some(rt) => match rt {
44 plan_rel::RelType::Rel(rel) => {
45 Ok(TypedPlan::from_substrait_rel(ctx, rel, &function_extension).await?)
46 }
47 plan_rel::RelType::Root(root) => {
48 let input = root.input.as_ref().with_context(|| InvalidQuerySnafu {
49 reason: "Root relation without input",
50 })?;
51
52 let mut ret =
53 TypedPlan::from_substrait_rel(ctx, input, &function_extension).await?;
54
55 if !root.names.is_empty() {
56 ret.schema = ret.schema.clone().try_with_names(root.names.clone())?;
57 }
58
59 Ok(ret)
60 }
61 },
62 None => plan_err!("Cannot parse plan relation: None"),
63 },
64 _ => not_impl_err!(
65 "Substrait plan with more than 1 relation trees not supported. Number of relation trees: {:?}",
66 plan.relations.len()
67 ),
68 }
69 }
70
71 #[async_recursion::async_recursion]
72 pub async fn from_substrait_project(
73 ctx: &mut FlownodeContext,
74 p: &ProjectRel,
75 extensions: &FunctionExtensions,
76 ) -> Result<TypedPlan, Error> {
77 let input = if let Some(input) = p.input.as_ref() {
78 TypedPlan::from_substrait_rel(ctx, input, extensions).await?
79 } else {
80 return not_impl_err!("Projection without an input is not supported");
81 };
82
83 let schema_before_expand = {
87 let input_schema = input.schema.clone();
88 let auto_columns: HashSet<usize> =
89 HashSet::from_iter(input_schema.typ().auto_columns.clone());
90 let not_auto_added_columns = (0..input_schema.len()?)
91 .filter(|i| !auto_columns.contains(i))
92 .collect_vec();
93 let mfp = MapFilterProject::new(input_schema.len()?)
94 .project(not_auto_added_columns)?
95 .into_safe();
96
97 input_schema.apply_mfp(&mfp)?
98 };
99
100 let mut exprs: Vec<TypedExpr> = Vec::with_capacity(p.expressions.len());
101 for e in &p.expressions {
102 let expr = TypedExpr::from_substrait_rex(e, &schema_before_expand, extensions).await?;
103 exprs.push(expr);
104 }
105 let is_literal = exprs.iter().all(|expr| expr.expr.is_literal());
106 if is_literal {
107 let (literals, lit_types): (Vec<_>, Vec<_>) = exprs
108 .into_iter()
109 .map(|TypedExpr { expr, typ }| (expr, typ))
110 .unzip();
111 let typ = RelationType::new(lit_types);
112 let row = literals
113 .into_iter()
114 .map(|lit| lit.as_literal().expect("A literal"))
115 .collect_vec();
116 let row = repr::Row::new(row);
117 let plan = Plan::Constant {
118 rows: vec![(row, repr::Timestamp::MIN, 1)],
119 };
120 Ok(TypedPlan {
121 schema: typ.into_unnamed(),
122 plan,
123 })
124 } else {
125 input.projection(exprs)
126 }
127 }
128
129 #[async_recursion::async_recursion]
130 pub async fn from_substrait_filter(
131 ctx: &mut FlownodeContext,
132 filter: &FilterRel,
133 extensions: &FunctionExtensions,
134 ) -> Result<TypedPlan, Error> {
135 let input = if let Some(input) = filter.input.as_ref() {
136 TypedPlan::from_substrait_rel(ctx, input, extensions).await?
137 } else {
138 return not_impl_err!("Filter without an input is not supported");
139 };
140
141 let expr = if let Some(condition) = filter.condition.as_ref() {
142 TypedExpr::from_substrait_rex(condition, &input.schema, extensions).await?
143 } else {
144 return not_impl_err!("Filter without an condition is not valid");
145 };
146 input.filter(expr)
147 }
148
149 pub async fn from_substrait_read(
150 ctx: &mut FlownodeContext,
151 read: &ReadRel,
152 _extensions: &FunctionExtensions,
153 ) -> Result<TypedPlan, Error> {
154 if let Some(ReadType::NamedTable(nt)) = &read.read_type {
155 let query_ctx = ctx.query_context.clone().context(UnexpectedSnafu {
156 reason: "Query context not found",
157 })?;
158 let table_reference = match nt.names.len() {
159 1 => [
160 query_ctx.current_catalog().to_string(),
161 query_ctx.current_schema().to_string(),
162 nt.names[0].clone(),
163 ],
164 2 => [
165 query_ctx.current_catalog().to_string(),
166 nt.names[0].clone(),
167 nt.names[1].clone(),
168 ],
169 3 => [
170 nt.names[0].clone(),
171 nt.names[1].clone(),
172 nt.names[2].clone(),
173 ],
174 _ => InvalidQuerySnafu {
175 reason: "Expect table to have name",
176 }
177 .fail()?,
178 };
179
180 let table = ctx.table(&table_reference).await?;
181 let get_table = Plan::Get {
182 id: crate::expr::Id::Global(table.0),
183 };
184 let get_table = TypedPlan {
185 schema: table.1,
186 plan: get_table,
187 };
188
189 if let Some(MaskExpression {
190 select: Some(projection),
191 ..
192 }) = &read.projection
193 {
194 let column_indices: Vec<usize> = projection
195 .struct_items
196 .iter()
197 .map(|item| item.field as usize)
198 .collect();
199 let input_arity = get_table.schema.typ().column_types.len();
200 let mfp = MapFilterProject::new(input_arity).project(column_indices.clone())?;
201 get_table.mfp(mfp.into_safe())
202 } else {
203 Ok(get_table)
204 }
205 } else {
206 not_impl_err!("Only NamedTable reads are supported")
207 }
208 }
209
210 pub async fn from_substrait_rel(
213 ctx: &mut FlownodeContext,
214 rel: &Rel,
215 extensions: &FunctionExtensions,
216 ) -> Result<TypedPlan, Error> {
217 match &rel.rel_type {
218 Some(RelType::Project(p)) => {
219 Self::from_substrait_project(ctx, p.as_ref(), extensions).await
220 }
221 Some(RelType::Filter(filter)) => {
222 Self::from_substrait_filter(ctx, filter, extensions).await
223 }
224 Some(RelType::Read(read)) => Self::from_substrait_read(ctx, read, extensions).await,
225 Some(RelType::Aggregate(agg)) => {
226 Self::from_substrait_agg_rel(ctx, agg, extensions).await
227 }
228 _ => not_impl_err!("Unsupported relation type: {:?}", rel.rel_type),
229 }
230 }
231}
232
233#[cfg(test)]
234mod test {
235 use datatypes::data_type::ConcreteDataType as CDT;
236 use datatypes::prelude::ConcreteDataType;
237 use pretty_assertions::assert_eq;
238
239 use super::*;
240 use crate::expr::GlobalId;
241 use crate::plan::{Plan, TypedPlan};
242 use crate::repr::{ColumnType, RelationType};
243 use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait};
244
245 #[tokio::test]
246 async fn test_select() {
247 let engine = create_test_query_engine();
248 let sql = "SELECT number FROM numbers";
249 let plan = sql_to_substrait(engine.clone(), sql).await;
250
251 let mut ctx = create_test_ctx();
252 let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).await;
253
254 let expected = TypedPlan {
255 schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
256 .into_named(vec![Some("number".to_string())]),
257 plan: Plan::Mfp {
258 input: Box::new(
259 Plan::Get {
260 id: crate::expr::Id::Global(GlobalId::User(0)),
261 }
262 .with_types(
263 RelationType::new(vec![ColumnType::new(
264 ConcreteDataType::uint32_datatype(),
265 false,
266 )])
267 .into_named(vec![Some("number".to_string())]),
268 ),
269 ),
270 mfp: MapFilterProject::new(1),
271 },
272 };
273
274 assert_eq!(flow_plan.unwrap(), expected);
275 }
276}