common_query/
logical_plan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod expr;
16
17use std::sync::Arc;
18
19use api::v1::TableName;
20use datafusion::catalog::CatalogProviderList;
21use datafusion::error::Result as DatafusionResult;
22use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder};
23use datafusion_common::{Column, TableReference};
24use datafusion_expr::dml::InsertOp;
25use datafusion_expr::{col, DmlStatement, TableSource, WriteOp};
26pub use expr::{build_filter_from_timestamp, build_same_type_ts_filter};
27use snafu::ResultExt;
28
29use crate::error::{GeneralDataFusionSnafu, Result};
30
31/// Rename columns by applying a new projection. Returns an error if the column to be
32/// renamed does not exist. The `renames` parameter is a `Vector` with elements
33/// in the form of `(old_name, new_name)`.
34pub fn rename_logical_plan_columns(
35    enable_ident_normalization: bool,
36    plan: LogicalPlan,
37    renames: Vec<(&str, &str)>,
38) -> DatafusionResult<LogicalPlan> {
39    let mut projection = Vec::with_capacity(renames.len());
40
41    for (old_name, new_name) in renames {
42        let old_column: Column = if enable_ident_normalization {
43            Column::from_qualified_name(old_name)
44        } else {
45            Column::from_qualified_name_ignore_case(old_name)
46        };
47
48        let (qualifier_rename, field_rename) =
49            plan.schema().qualified_field_from_column(&old_column)?;
50
51        for (qualifier, field) in plan.schema().iter() {
52            if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
53                projection.push(col(Column::from((qualifier, field))).alias(new_name));
54            }
55        }
56    }
57
58    LogicalPlanBuilder::from(plan).project(projection)?.build()
59}
60
61/// Convert a insert into logical plan to an (table_name, logical_plan)
62/// where table_name is the name of the table to insert into.
63/// logical_plan is the plan to be executed.
64///
65/// if input logical plan is not `insert into table_name <input>`, return None
66///
67/// Returned TableName will use provided catalog and schema if not specified in the logical plan,
68/// if table scan in logical plan have full table name, will **NOT** override it.
69pub fn breakup_insert_plan(
70    plan: &LogicalPlan,
71    default_catalog: &str,
72    default_schema: &str,
73) -> Option<(TableName, Arc<LogicalPlan>)> {
74    if let LogicalPlan::Dml(dml) = plan {
75        if dml.op != WriteOp::Insert(InsertOp::Append) {
76            return None;
77        }
78        let table_name = &dml.table_name;
79        let table_name = match table_name {
80            TableReference::Bare { table } => TableName {
81                catalog_name: default_catalog.to_string(),
82                schema_name: default_schema.to_string(),
83                table_name: table.to_string(),
84            },
85            TableReference::Partial { schema, table } => TableName {
86                catalog_name: default_catalog.to_string(),
87                schema_name: schema.to_string(),
88                table_name: table.to_string(),
89            },
90            TableReference::Full {
91                catalog,
92                schema,
93                table,
94            } => TableName {
95                catalog_name: catalog.to_string(),
96                schema_name: schema.to_string(),
97                table_name: table.to_string(),
98            },
99        };
100        let logical_plan = dml.input.clone();
101        Some((table_name, logical_plan))
102    } else {
103        None
104    }
105}
106
107/// create a `insert into table_name <input>` logical plan
108pub fn add_insert_to_logical_plan(
109    table_name: TableName,
110    target: Arc<dyn TableSource>,
111    input: LogicalPlan,
112) -> Result<LogicalPlan> {
113    let table_name = TableReference::Full {
114        catalog: table_name.catalog_name.into(),
115        schema: table_name.schema_name.into(),
116        table: table_name.table_name.into(),
117    };
118
119    let plan = LogicalPlan::Dml(DmlStatement::new(
120        table_name,
121        target,
122        WriteOp::Insert(InsertOp::Append),
123        Arc::new(input),
124    ));
125    let plan = plan.recompute_schema().context(GeneralDataFusionSnafu)?;
126    Ok(plan)
127}
128
129/// The datafusion `[LogicalPlan]` decoder.
130#[async_trait::async_trait]
131pub trait SubstraitPlanDecoder {
132    /// Decode the [`LogicalPlan`] from bytes with the [`CatalogProviderList`].
133    /// When `optimize` is true, it will do the optimization for decoded plan.
134    ///
135    /// TODO(dennis): It's not a good design for an API to do many things.
136    /// The `optimize` was introduced because of `query` and `catalog` cyclic dependency issue
137    /// I am happy to refactor it if we have a better solution.
138    async fn decode(
139        &self,
140        message: bytes::Bytes,
141        catalog_list: Arc<dyn CatalogProviderList>,
142        optimize: bool,
143    ) -> Result<LogicalPlan>;
144}
145
146pub type SubstraitPlanDecoderRef = Arc<dyn SubstraitPlanDecoder + Send + Sync>;
147
148#[cfg(test)]
149mod tests {
150    use std::sync::Arc;
151
152    use datafusion_expr::builder::LogicalTableSource;
153    use datafusion_expr::lit;
154    use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
155
156    use super::*;
157
158    fn mock_plan() -> LogicalPlan {
159        let schema = Schema::new(vec![
160            Field::new("id", DataType::Int32, true),
161            Field::new("name", DataType::Utf8, true),
162        ]);
163        let table_source = LogicalTableSource::new(SchemaRef::new(schema));
164
165        let projection = None;
166
167        let builder =
168            LogicalPlanBuilder::scan("person", Arc::new(table_source), projection).unwrap();
169
170        builder
171            .filter(col("id").gt(lit(500)))
172            .unwrap()
173            .build()
174            .unwrap()
175    }
176
177    #[test]
178    fn test_rename_logical_plan_columns() {
179        let plan = mock_plan();
180        let new_plan =
181            rename_logical_plan_columns(true, plan, vec![("id", "a"), ("name", "b")]).unwrap();
182
183        assert_eq!(
184            r#"
185Projection: person.id AS a, person.name AS b
186  Filter: person.id > Int32(500)
187    TableScan: person"#,
188            format!("\n{}", new_plan)
189        );
190    }
191}