common_query/
logical_plan.rs1mod expr;
16
17use std::sync::Arc;
18
19use api::v1::TableName;
20use datafusion::catalog::CatalogProviderList;
21use datafusion::error::Result as DatafusionResult;
22use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder};
23use datafusion_common::{Column, TableReference};
24use datafusion_expr::dml::InsertOp;
25use datafusion_expr::{DmlStatement, TableSource, WriteOp, col};
26pub use expr::{build_filter_from_timestamp, build_same_type_ts_filter};
27
28use crate::error::Result;
29
30pub fn rename_logical_plan_columns(
34 enable_ident_normalization: bool,
35 plan: LogicalPlan,
36 renames: Vec<(&str, &str)>,
37) -> DatafusionResult<LogicalPlan> {
38 let mut projection = Vec::with_capacity(renames.len());
39
40 for (old_name, new_name) in renames {
41 let old_column: Column = if enable_ident_normalization {
42 Column::from_qualified_name(old_name)
43 } else {
44 Column::from_qualified_name_ignore_case(old_name)
45 };
46
47 let (qualifier_rename, field_rename) =
48 plan.schema().qualified_field_from_column(&old_column)?;
49
50 for (qualifier, field) in plan.schema().iter() {
51 if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
52 projection.push(col(Column::from((qualifier, field))).alias(new_name));
53 }
54 }
55 }
56
57 LogicalPlanBuilder::from(plan).project(projection)?.build()
58}
59
60pub fn breakup_insert_plan(
69 plan: &LogicalPlan,
70 default_catalog: &str,
71 default_schema: &str,
72) -> Option<(TableName, Arc<LogicalPlan>)> {
73 if let LogicalPlan::Dml(dml) = plan {
74 if dml.op != WriteOp::Insert(InsertOp::Append) {
75 return None;
76 }
77 let table_name = &dml.table_name;
78 let table_name = match table_name {
79 TableReference::Bare { table } => TableName {
80 catalog_name: default_catalog.to_string(),
81 schema_name: default_schema.to_string(),
82 table_name: table.to_string(),
83 },
84 TableReference::Partial { schema, table } => TableName {
85 catalog_name: default_catalog.to_string(),
86 schema_name: schema.to_string(),
87 table_name: table.to_string(),
88 },
89 TableReference::Full {
90 catalog,
91 schema,
92 table,
93 } => TableName {
94 catalog_name: catalog.to_string(),
95 schema_name: schema.to_string(),
96 table_name: table.to_string(),
97 },
98 };
99 let logical_plan = dml.input.clone();
100 Some((table_name, logical_plan))
101 } else {
102 None
103 }
104}
105
106pub fn add_insert_to_logical_plan(
108 table_name: TableName,
109 target: Arc<dyn TableSource>,
110 input: LogicalPlan,
111) -> Result<LogicalPlan> {
112 let table_name = TableReference::Full {
113 catalog: table_name.catalog_name.into(),
114 schema: table_name.schema_name.into(),
115 table: table_name.table_name.into(),
116 };
117
118 let plan = LogicalPlan::Dml(DmlStatement::new(
119 table_name,
120 target,
121 WriteOp::Insert(InsertOp::Append),
122 Arc::new(input),
123 ));
124 let plan = plan.recompute_schema()?;
125 Ok(plan)
126}
127
128#[async_trait::async_trait]
130pub trait SubstraitPlanDecoder {
131 async fn decode(
138 &self,
139 message: bytes::Bytes,
140 catalog_list: Arc<dyn CatalogProviderList>,
141 optimize: bool,
142 ) -> Result<LogicalPlan>;
143}
144
145pub type SubstraitPlanDecoderRef = Arc<dyn SubstraitPlanDecoder + Send + Sync>;
146
147#[cfg(test)]
148mod tests {
149 use std::sync::Arc;
150
151 use datafusion_expr::builder::LogicalTableSource;
152 use datafusion_expr::lit;
153 use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
154
155 use super::*;
156
157 fn mock_plan() -> LogicalPlan {
158 let schema = Schema::new(vec![
159 Field::new("id", DataType::Int32, true),
160 Field::new("name", DataType::Utf8, true),
161 ]);
162 let table_source = LogicalTableSource::new(SchemaRef::new(schema));
163
164 let projection = None;
165
166 let builder =
167 LogicalPlanBuilder::scan("person", Arc::new(table_source), projection).unwrap();
168
169 builder
170 .filter(col("id").gt(lit(500)))
171 .unwrap()
172 .build()
173 .unwrap()
174 }
175
176 #[test]
177 fn test_rename_logical_plan_columns() {
178 let plan = mock_plan();
179 let new_plan =
180 rename_logical_plan_columns(true, plan, vec![("id", "a"), ("name", "b")]).unwrap();
181
182 assert_eq!(
183 r#"
184Projection: person.id AS a, person.name AS b
185 Filter: person.id > Int32(500)
186 TableScan: person"#,
187 format!("\n{}", new_plan)
188 );
189 }
190}