substrait/
df_substrait.rs1use std::sync::Arc;
16
17use async_trait::async_trait;
18use bytes::{Buf, Bytes, BytesMut};
19use common_function::aggrs::aggr_wrapper::fix_order::{
20 FixStateUdafOrderingAnalyzer, UnFixStateUdafOrderingAnalyzer,
21};
22use datafusion::execution::SessionStateBuilder;
23use datafusion::execution::context::SessionState;
24use datafusion::execution::runtime_env::RuntimeEnv;
25use datafusion::optimizer::AnalyzerRule;
26use datafusion::prelude::SessionConfig;
27use datafusion_expr::LogicalPlan;
28use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
29use datafusion_substrait::logical_plan::producer::to_substrait_plan;
30use datafusion_substrait::substrait::proto::Plan;
31use prost::Message;
32use snafu::ResultExt;
33
34use crate::error::{DecodeDfPlanSnafu, DecodeRelSnafu, EncodeDfPlanSnafu, EncodeRelSnafu, Error};
35use crate::{SerializerRegistry, SubstraitPlan};
36
37pub struct DFLogicalSubstraitConvertor;
38
39#[async_trait]
40impl SubstraitPlan for DFLogicalSubstraitConvertor {
41 type Error = Error;
42
43 type Plan = LogicalPlan;
44
45 async fn decode<B: Buf + Send>(
46 &self,
47 message: B,
48 state: SessionState,
49 ) -> Result<Self::Plan, Self::Error> {
50 let plan = Plan::decode(message).context(DecodeRelSnafu)?;
51 let df_plan = from_substrait_plan(&state, &plan)
52 .await
53 .context(DecodeDfPlanSnafu)?;
54 let df_plan = FixStateUdafOrderingAnalyzer {}
55 .analyze(df_plan, state.config_options())
56 .context(DecodeDfPlanSnafu)?;
57 Ok(df_plan)
58 }
59
60 fn encode(
61 &self,
62 plan: &Self::Plan,
63 serializer: impl SerializerRegistry + 'static,
64 ) -> Result<Bytes, Self::Error> {
65 let plan = UnFixStateUdafOrderingAnalyzer {}
66 .analyze(plan.clone(), &Default::default())
67 .context(EncodeDfPlanSnafu)?;
68 let mut buf = BytesMut::new();
69 let substrait_plan = self.to_sub_plan(&plan, serializer)?;
70 substrait_plan.encode(&mut buf).context(EncodeRelSnafu)?;
71
72 Ok(buf.freeze())
73 }
74}
75
76impl DFLogicalSubstraitConvertor {
77 pub fn to_sub_plan(
78 &self,
79 plan: &LogicalPlan,
80 serializer: impl SerializerRegistry + 'static,
81 ) -> Result<Box<Plan>, Error> {
82 let state = SessionStateBuilder::new()
83 .with_config(SessionConfig::new())
84 .with_runtime_env(Arc::new(RuntimeEnv::default()))
85 .with_default_features()
86 .with_serializer_registry(Arc::new(serializer))
87 .build();
88 to_substrait_plan(plan, &state).context(EncodeDfPlanSnafu)
89 }
90}