substrait/
df_substrait.rs1use std::sync::Arc;
16
17use async_trait::async_trait;
18use bytes::{Buf, Bytes, BytesMut};
19use datafusion::execution::context::SessionState;
20use datafusion::execution::runtime_env::RuntimeEnv;
21use datafusion::execution::SessionStateBuilder;
22use datafusion::prelude::SessionConfig;
23use datafusion_expr::LogicalPlan;
24use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
25use datafusion_substrait::logical_plan::producer::to_substrait_plan;
26use datafusion_substrait::substrait::proto::Plan;
27use prost::Message;
28use snafu::ResultExt;
29
30use crate::error::{DecodeDfPlanSnafu, DecodeRelSnafu, EncodeDfPlanSnafu, EncodeRelSnafu, Error};
31use crate::{SerializerRegistry, SubstraitPlan};
32
33pub struct DFLogicalSubstraitConvertor;
34
35#[async_trait]
36impl SubstraitPlan for DFLogicalSubstraitConvertor {
37 type Error = Error;
38
39 type Plan = LogicalPlan;
40
41 async fn decode<B: Buf + Send>(
42 &self,
43 message: B,
44 state: SessionState,
45 ) -> Result<Self::Plan, Self::Error> {
46 let plan = Plan::decode(message).context(DecodeRelSnafu)?;
47 let df_plan = from_substrait_plan(&state, &plan)
48 .await
49 .context(DecodeDfPlanSnafu)?;
50 Ok(df_plan)
51 }
52
53 fn encode(
54 &self,
55 plan: &Self::Plan,
56 serializer: impl SerializerRegistry + 'static,
57 ) -> Result<Bytes, Self::Error> {
58 let mut buf = BytesMut::new();
59 let substrait_plan = self.to_sub_plan(plan, serializer)?;
60 substrait_plan.encode(&mut buf).context(EncodeRelSnafu)?;
61
62 Ok(buf.freeze())
63 }
64}
65
66impl DFLogicalSubstraitConvertor {
67 pub fn to_sub_plan(
68 &self,
69 plan: &LogicalPlan,
70 serializer: impl SerializerRegistry + 'static,
71 ) -> Result<Box<Plan>, Error> {
72 let state = SessionStateBuilder::new()
73 .with_config(SessionConfig::new())
74 .with_runtime_env(Arc::new(RuntimeEnv::default()))
75 .with_default_features()
76 .with_serializer_registry(Arc::new(serializer))
77 .build();
78 to_substrait_plan(plan, &state).context(EncodeDfPlanSnafu)
79 }
80}