substrait/
df_substrait.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use async_trait::async_trait;
18use bytes::{Buf, Bytes, BytesMut};
19use common_function::aggrs::aggr_wrapper::fix_order::{
20    FixStateUdafOrderingAnalyzer, UnFixStateUdafOrderingAnalyzer,
21};
22use datafusion::execution::SessionStateBuilder;
23use datafusion::execution::context::SessionState;
24use datafusion::execution::runtime_env::RuntimeEnv;
25use datafusion::optimizer::AnalyzerRule;
26use datafusion::prelude::SessionConfig;
27use datafusion_expr::LogicalPlan;
28use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
29use datafusion_substrait::logical_plan::producer::to_substrait_plan;
30use datafusion_substrait::substrait::proto::Plan;
31use prost::Message;
32use snafu::ResultExt;
33
34use crate::error::{DecodeDfPlanSnafu, DecodeRelSnafu, EncodeDfPlanSnafu, EncodeRelSnafu, Error};
35use crate::{SerializerRegistry, SubstraitPlan};
36
37pub struct DFLogicalSubstraitConvertor;
38
39#[async_trait]
40impl SubstraitPlan for DFLogicalSubstraitConvertor {
41    type Error = Error;
42
43    type Plan = LogicalPlan;
44
45    async fn decode<B: Buf + Send>(
46        &self,
47        message: B,
48        state: SessionState,
49    ) -> Result<Self::Plan, Self::Error> {
50        let plan = Plan::decode(message).context(DecodeRelSnafu)?;
51        let df_plan = from_substrait_plan(&state, &plan)
52            .await
53            .context(DecodeDfPlanSnafu)?;
54        let df_plan = FixStateUdafOrderingAnalyzer {}
55            .analyze(df_plan, state.config_options())
56            .context(DecodeDfPlanSnafu)?;
57        Ok(df_plan)
58    }
59
60    fn encode(
61        &self,
62        plan: &Self::Plan,
63        serializer: impl SerializerRegistry + 'static,
64    ) -> Result<Bytes, Self::Error> {
65        let plan = UnFixStateUdafOrderingAnalyzer {}
66            .analyze(plan.clone(), &Default::default())
67            .context(EncodeDfPlanSnafu)?;
68        let mut buf = BytesMut::new();
69        let substrait_plan = self.to_sub_plan(&plan, serializer)?;
70        substrait_plan.encode(&mut buf).context(EncodeRelSnafu)?;
71
72        Ok(buf.freeze())
73    }
74}
75
76impl DFLogicalSubstraitConvertor {
77    pub fn to_sub_plan(
78        &self,
79        plan: &LogicalPlan,
80        serializer: impl SerializerRegistry + 'static,
81    ) -> Result<Box<Plan>, Error> {
82        let state = SessionStateBuilder::new()
83            .with_config(SessionConfig::new())
84            .with_runtime_env(Arc::new(RuntimeEnv::default()))
85            .with_default_features()
86            .with_serializer_registry(Arc::new(serializer))
87            .build();
88        to_substrait_plan(plan, &state).context(EncodeDfPlanSnafu)
89    }
90}