common_query/
logical_plan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod expr;
16
17use std::sync::Arc;
18
19use api::v1::TableName;
20use datafusion::catalog::CatalogProviderList;
21use datafusion::error::Result as DatafusionResult;
22use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder};
23use datafusion_common::{Column, TableReference};
24use datafusion_expr::dml::InsertOp;
25use datafusion_expr::{DmlStatement, TableSource, WriteOp, col};
26pub use expr::{build_filter_from_timestamp, build_same_type_ts_filter};
27
28use crate::error::Result;
29
30/// Rename columns by applying a new projection. Returns an error if the column to be
31/// renamed does not exist. The `renames` parameter is a `Vector` with elements
32/// in the form of `(old_name, new_name)`.
33pub fn rename_logical_plan_columns(
34    enable_ident_normalization: bool,
35    plan: LogicalPlan,
36    renames: Vec<(&str, &str)>,
37) -> DatafusionResult<LogicalPlan> {
38    let mut projection = Vec::with_capacity(renames.len());
39
40    for (old_name, new_name) in renames {
41        let old_column: Column = if enable_ident_normalization {
42            Column::from_qualified_name(old_name)
43        } else {
44            Column::from_qualified_name_ignore_case(old_name)
45        };
46
47        let (qualifier_rename, field_rename) =
48            plan.schema().qualified_field_from_column(&old_column)?;
49
50        for (qualifier, field) in plan.schema().iter() {
51            if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
52                projection.push(col(Column::from((qualifier, field))).alias(new_name));
53            }
54        }
55    }
56
57    LogicalPlanBuilder::from(plan).project(projection)?.build()
58}
59
60/// Convert a insert into logical plan to an (table_name, logical_plan)
61/// where table_name is the name of the table to insert into.
62/// logical_plan is the plan to be executed.
63///
64/// if input logical plan is not `insert into table_name <input>`, return None
65///
66/// Returned TableName will use provided catalog and schema if not specified in the logical plan,
67/// if table scan in logical plan have full table name, will **NOT** override it.
68pub fn breakup_insert_plan(
69    plan: &LogicalPlan,
70    default_catalog: &str,
71    default_schema: &str,
72) -> Option<(TableName, Arc<LogicalPlan>)> {
73    if let LogicalPlan::Dml(dml) = plan {
74        if dml.op != WriteOp::Insert(InsertOp::Append) {
75            return None;
76        }
77        let table_name = &dml.table_name;
78        let table_name = match table_name {
79            TableReference::Bare { table } => TableName {
80                catalog_name: default_catalog.to_string(),
81                schema_name: default_schema.to_string(),
82                table_name: table.to_string(),
83            },
84            TableReference::Partial { schema, table } => TableName {
85                catalog_name: default_catalog.to_string(),
86                schema_name: schema.to_string(),
87                table_name: table.to_string(),
88            },
89            TableReference::Full {
90                catalog,
91                schema,
92                table,
93            } => TableName {
94                catalog_name: catalog.to_string(),
95                schema_name: schema.to_string(),
96                table_name: table.to_string(),
97            },
98        };
99        let logical_plan = dml.input.clone();
100        Some((table_name, logical_plan))
101    } else {
102        None
103    }
104}
105
106/// create a `insert into table_name <input>` logical plan
107pub fn add_insert_to_logical_plan(
108    table_name: TableName,
109    target: Arc<dyn TableSource>,
110    input: LogicalPlan,
111) -> Result<LogicalPlan> {
112    let table_name = TableReference::Full {
113        catalog: table_name.catalog_name.into(),
114        schema: table_name.schema_name.into(),
115        table: table_name.table_name.into(),
116    };
117
118    let plan = LogicalPlan::Dml(DmlStatement::new(
119        table_name,
120        target,
121        WriteOp::Insert(InsertOp::Append),
122        Arc::new(input),
123    ));
124    let plan = plan.recompute_schema()?;
125    Ok(plan)
126}
127
128/// The datafusion `[LogicalPlan]` decoder.
129#[async_trait::async_trait]
130pub trait SubstraitPlanDecoder {
131    /// Decode the [`LogicalPlan`] from bytes with the [`CatalogProviderList`].
132    /// When `optimize` is true, it will do the optimization for decoded plan.
133    ///
134    /// TODO(dennis): It's not a good design for an API to do many things.
135    /// The `optimize` was introduced because of `query` and `catalog` cyclic dependency issue
136    /// I am happy to refactor it if we have a better solution.
137    async fn decode(
138        &self,
139        message: bytes::Bytes,
140        catalog_list: Arc<dyn CatalogProviderList>,
141        optimize: bool,
142    ) -> Result<LogicalPlan>;
143}
144
145pub type SubstraitPlanDecoderRef = Arc<dyn SubstraitPlanDecoder + Send + Sync>;
146
147#[cfg(test)]
148mod tests {
149    use std::sync::Arc;
150
151    use datafusion_expr::builder::LogicalTableSource;
152    use datafusion_expr::lit;
153    use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
154
155    use super::*;
156
157    fn mock_plan() -> LogicalPlan {
158        let schema = Schema::new(vec![
159            Field::new("id", DataType::Int32, true),
160            Field::new("name", DataType::Utf8, true),
161        ]);
162        let table_source = LogicalTableSource::new(SchemaRef::new(schema));
163
164        let projection = None;
165
166        let builder =
167            LogicalPlanBuilder::scan("person", Arc::new(table_source), projection).unwrap();
168
169        builder
170            .filter(col("id").gt(lit(500)))
171            .unwrap()
172            .build()
173            .unwrap()
174    }
175
176    #[test]
177    fn test_rename_logical_plan_columns() {
178        let plan = mock_plan();
179        let new_plan =
180            rename_logical_plan_columns(true, plan, vec![("id", "a"), ("name", "b")]).unwrap();
181
182        assert_eq!(
183            r#"
184Projection: person.id AS a, person.name AS b
185  Filter: person.id > Int32(500)
186    TableScan: person"#,
187            format!("\n{}", new_plan)
188        );
189    }
190}