substrait/
df_substrait.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use async_trait::async_trait;
18use bytes::{Buf, Bytes, BytesMut};
19use datafusion::execution::context::SessionState;
20use datafusion::execution::runtime_env::RuntimeEnv;
21use datafusion::execution::SessionStateBuilder;
22use datafusion::prelude::SessionConfig;
23use datafusion_expr::LogicalPlan;
24use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
25use datafusion_substrait::logical_plan::producer::to_substrait_plan;
26use datafusion_substrait::substrait::proto::Plan;
27use prost::Message;
28use snafu::ResultExt;
29
30use crate::error::{DecodeDfPlanSnafu, DecodeRelSnafu, EncodeDfPlanSnafu, EncodeRelSnafu, Error};
31use crate::{SerializerRegistry, SubstraitPlan};
32
33pub struct DFLogicalSubstraitConvertor;
34
35#[async_trait]
36impl SubstraitPlan for DFLogicalSubstraitConvertor {
37    type Error = Error;
38
39    type Plan = LogicalPlan;
40
41    async fn decode<B: Buf + Send>(
42        &self,
43        message: B,
44        state: SessionState,
45    ) -> Result<Self::Plan, Self::Error> {
46        let plan = Plan::decode(message).context(DecodeRelSnafu)?;
47        let df_plan = from_substrait_plan(&state, &plan)
48            .await
49            .context(DecodeDfPlanSnafu)?;
50        Ok(df_plan)
51    }
52
53    fn encode(
54        &self,
55        plan: &Self::Plan,
56        serializer: impl SerializerRegistry + 'static,
57    ) -> Result<Bytes, Self::Error> {
58        let mut buf = BytesMut::new();
59        let substrait_plan = self.to_sub_plan(plan, serializer)?;
60        substrait_plan.encode(&mut buf).context(EncodeRelSnafu)?;
61
62        Ok(buf.freeze())
63    }
64}
65
66impl DFLogicalSubstraitConvertor {
67    pub fn to_sub_plan(
68        &self,
69        plan: &LogicalPlan,
70        serializer: impl SerializerRegistry + 'static,
71    ) -> Result<Box<Plan>, Error> {
72        let state = SessionStateBuilder::new()
73            .with_config(SessionConfig::new())
74            .with_runtime_env(Arc::new(RuntimeEnv::default()))
75            .with_default_features()
76            .with_serializer_registry(Arc::new(serializer))
77            .build();
78        to_substrait_plan(plan, &state).context(EncodeDfPlanSnafu)
79    }
80}