common_query/
logical_plan.rs1mod expr;
16
17use std::sync::Arc;
18
19use api::v1::TableName;
20use datafusion::catalog::CatalogProviderList;
21use datafusion::error::Result as DatafusionResult;
22use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder};
23use datafusion_common::{Column, TableReference};
24use datafusion_expr::dml::InsertOp;
25use datafusion_expr::{col, DmlStatement, TableSource, WriteOp};
26pub use expr::{build_filter_from_timestamp, build_same_type_ts_filter};
27use snafu::ResultExt;
28
29use crate::error::{GeneralDataFusionSnafu, Result};
30
31pub fn rename_logical_plan_columns(
35 enable_ident_normalization: bool,
36 plan: LogicalPlan,
37 renames: Vec<(&str, &str)>,
38) -> DatafusionResult<LogicalPlan> {
39 let mut projection = Vec::with_capacity(renames.len());
40
41 for (old_name, new_name) in renames {
42 let old_column: Column = if enable_ident_normalization {
43 Column::from_qualified_name(old_name)
44 } else {
45 Column::from_qualified_name_ignore_case(old_name)
46 };
47
48 let (qualifier_rename, field_rename) =
49 plan.schema().qualified_field_from_column(&old_column)?;
50
51 for (qualifier, field) in plan.schema().iter() {
52 if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
53 projection.push(col(Column::from((qualifier, field))).alias(new_name));
54 }
55 }
56 }
57
58 LogicalPlanBuilder::from(plan).project(projection)?.build()
59}
60
61pub fn breakup_insert_plan(
70 plan: &LogicalPlan,
71 default_catalog: &str,
72 default_schema: &str,
73) -> Option<(TableName, Arc<LogicalPlan>)> {
74 if let LogicalPlan::Dml(dml) = plan {
75 if dml.op != WriteOp::Insert(InsertOp::Append) {
76 return None;
77 }
78 let table_name = &dml.table_name;
79 let table_name = match table_name {
80 TableReference::Bare { table } => TableName {
81 catalog_name: default_catalog.to_string(),
82 schema_name: default_schema.to_string(),
83 table_name: table.to_string(),
84 },
85 TableReference::Partial { schema, table } => TableName {
86 catalog_name: default_catalog.to_string(),
87 schema_name: schema.to_string(),
88 table_name: table.to_string(),
89 },
90 TableReference::Full {
91 catalog,
92 schema,
93 table,
94 } => TableName {
95 catalog_name: catalog.to_string(),
96 schema_name: schema.to_string(),
97 table_name: table.to_string(),
98 },
99 };
100 let logical_plan = dml.input.clone();
101 Some((table_name, logical_plan))
102 } else {
103 None
104 }
105}
106
107pub fn add_insert_to_logical_plan(
109 table_name: TableName,
110 target: Arc<dyn TableSource>,
111 input: LogicalPlan,
112) -> Result<LogicalPlan> {
113 let table_name = TableReference::Full {
114 catalog: table_name.catalog_name.into(),
115 schema: table_name.schema_name.into(),
116 table: table_name.table_name.into(),
117 };
118
119 let plan = LogicalPlan::Dml(DmlStatement::new(
120 table_name,
121 target,
122 WriteOp::Insert(InsertOp::Append),
123 Arc::new(input),
124 ));
125 let plan = plan.recompute_schema().context(GeneralDataFusionSnafu)?;
126 Ok(plan)
127}
128
129#[async_trait::async_trait]
131pub trait SubstraitPlanDecoder {
132 async fn decode(
139 &self,
140 message: bytes::Bytes,
141 catalog_list: Arc<dyn CatalogProviderList>,
142 optimize: bool,
143 ) -> Result<LogicalPlan>;
144}
145
146pub type SubstraitPlanDecoderRef = Arc<dyn SubstraitPlanDecoder + Send + Sync>;
147
148#[cfg(test)]
149mod tests {
150 use std::sync::Arc;
151
152 use datafusion_expr::builder::LogicalTableSource;
153 use datafusion_expr::lit;
154 use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
155
156 use super::*;
157
158 fn mock_plan() -> LogicalPlan {
159 let schema = Schema::new(vec![
160 Field::new("id", DataType::Int32, true),
161 Field::new("name", DataType::Utf8, true),
162 ]);
163 let table_source = LogicalTableSource::new(SchemaRef::new(schema));
164
165 let projection = None;
166
167 let builder =
168 LogicalPlanBuilder::scan("person", Arc::new(table_source), projection).unwrap();
169
170 builder
171 .filter(col("id").gt(lit(500)))
172 .unwrap()
173 .build()
174 .unwrap()
175 }
176
177 #[test]
178 fn test_rename_logical_plan_columns() {
179 let plan = mock_plan();
180 let new_plan =
181 rename_logical_plan_columns(true, plan, vec![("id", "a"), ("name", "b")]).unwrap();
182
183 assert_eq!(
184 r#"
185Projection: person.id AS a, person.name AS b
186 Filter: person.id > Int32(500)
187 TableScan: person"#,
188 format!("\n{}", new_plan)
189 );
190 }
191}