query/dist_plan/analyzer/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17
18use datafusion::error::Result as DfResult;
19use datafusion_common::Column;
20use datafusion_common::tree_node::{Transformed, TreeNode as _};
21use datafusion_expr::expr::Alias;
22use datafusion_expr::{Expr, Extension, LogicalPlan};
23
24use crate::dist_plan::merge_sort::MergeSortLogicalPlan;
25use crate::plan::ExtractExpr as _;
26
27fn rewrite_column(
28    mapping: &BTreeMap<Column, BTreeSet<Column>>,
29    original_node: &LogicalPlan,
30    alias_node: &LogicalPlan,
31) -> impl Fn(Expr) -> DfResult<Transformed<Expr>> {
32    move |e: Expr| {
33        if let Expr::Column(col) = e {
34            if let Some(aliased_cols) = mapping.get(&col) {
35                // if multiple alias is available, just use first one
36                if let Some(aliased_col) = aliased_cols.iter().next() {
37                    Ok(Transformed::yes(Expr::Column(aliased_col.clone())))
38                } else {
39                    Err(datafusion_common::DataFusionError::Internal(format!(
40                        "PlanRewriter: expand: column {col} from {original_node}\n has empty alias set in plan: {alias_node}\n but expect at least one alias",
41                    )))
42                }
43            } else {
44                Err(datafusion_common::DataFusionError::Internal(format!(
45                    "PlanRewriter: expand: column {col} from {original_node}\n has no alias in plan: {alias_node}",
46                )))
47            }
48        } else {
49            Ok(Transformed::no(e))
50        }
51    }
52}
53
54/// Rewrite the expressions of the given merge sort plan from original columns(at merge sort's input plan) to aliased columns at the given aliased node
55pub fn rewrite_merge_sort_exprs(
56    merge_sort: &MergeSortLogicalPlan,
57    aliased_node: &LogicalPlan,
58) -> DfResult<LogicalPlan> {
59    let merge_sort = LogicalPlan::Extension(Extension {
60        node: Arc::new(merge_sort.clone()),
61    });
62
63    // tracking alias for sort exprs,
64    let sort_input = merge_sort.inputs().first().cloned().ok_or_else(|| {
65        datafusion_common::DataFusionError::Internal(format!(
66            "PlanRewriter: expand: merge sort stage has no input: {merge_sort}"
67        ))
68    })?;
69    let sort_exprs = merge_sort.expressions_consider_join();
70    let column_refs = sort_exprs
71        .iter()
72        .flat_map(|e| e.column_refs().into_iter().cloned())
73        .collect::<BTreeSet<_>>();
74    let column_alias_mapping = aliased_columns_for(&column_refs, aliased_node, Some(sort_input))?;
75    let aliased_sort_exprs = sort_exprs
76        .into_iter()
77        .map(|e| {
78            e.transform(rewrite_column(
79                &column_alias_mapping,
80                &merge_sort,
81                aliased_node,
82            ))
83        })
84        .map(|e| e.map(|e| e.data))
85        .collect::<DfResult<Vec<_>>>()?;
86    let new_merge_sort = merge_sort.with_new_exprs(
87        aliased_sort_exprs,
88        merge_sort.inputs().into_iter().cloned().collect(),
89    )?;
90    Ok(new_merge_sort)
91}
92
93/// Return all the original columns(at original node) for the given aliased columns at the aliased node
94///
95/// if `original_node` is None, it means original columns are from leaf node
96///
97/// Return value use `BTreeMap` to have deterministic order for choose first alias when multiple alias exist
98#[allow(unused)]
99pub fn original_column_for(
100    aliased_columns: &BTreeSet<Column>,
101    aliased_node: LogicalPlan,
102    original_node: Option<Arc<LogicalPlan>>,
103) -> DfResult<BTreeMap<Column, Column>> {
104    let schema_cols: BTreeSet<Column> = aliased_node.schema().columns().iter().cloned().collect();
105    let cur_aliases: BTreeMap<Column, Column> = aliased_columns
106        .iter()
107        .filter(|c| schema_cols.contains(c))
108        .map(|c| (c.clone(), c.clone()))
109        .collect();
110
111    if cur_aliases.is_empty() {
112        return Ok(BTreeMap::new());
113    }
114
115    original_column_for_inner(cur_aliases, &aliased_node, &original_node)
116}
117
118fn original_column_for_inner(
119    mut cur_aliases: BTreeMap<Column, Column>,
120    node: &LogicalPlan,
121    original_node: &Option<Arc<LogicalPlan>>,
122) -> DfResult<BTreeMap<Column, Column>> {
123    let mut current_node = node;
124
125    loop {
126        // Base case: check if we've reached the target node
127        if let Some(original_node) = original_node
128            && *current_node == **original_node
129        {
130            return Ok(cur_aliases);
131        } else if current_node.inputs().is_empty() {
132            // leaf node reached
133            return Ok(cur_aliases);
134        }
135
136        // Validate node has exactly one child
137        if current_node.inputs().len() != 1 {
138            return Err(datafusion::error::DataFusionError::Internal(format!(
139                "only accept plan with at most one child, found: {}",
140                current_node
141            )));
142        }
143
144        // Get alias layer and update aliases
145        let layer = get_alias_layer_from_node(current_node)?;
146        let mut new_aliases = BTreeMap::new();
147        for (start_alias, cur_alias) in cur_aliases {
148            if let Some(old_column) = layer.get_old_from_new(cur_alias.clone()) {
149                new_aliases.insert(start_alias, old_column);
150            }
151        }
152
153        // Move to child node and continue iteration
154        cur_aliases = new_aliases;
155        current_node = current_node.inputs()[0];
156    }
157}
158
159/// Return all the aliased columns(at aliased node) for the given original columns(at original node)
160///
161/// if `original_node` is None, it means original columns are from leaf node
162///
163/// Return value use `BTreeMap` to have deterministic order for choose first alias when multiple alias exist
164pub fn aliased_columns_for(
165    original_columns: &BTreeSet<Column>,
166    aliased_node: &LogicalPlan,
167    original_node: Option<&LogicalPlan>,
168) -> DfResult<BTreeMap<Column, BTreeSet<Column>>> {
169    let initial_aliases: BTreeMap<Column, BTreeSet<Column>> = {
170        if let Some(original) = &original_node {
171            let schema_cols: BTreeSet<Column> = original.schema().columns().into_iter().collect();
172            original_columns
173                .iter()
174                .filter(|c| schema_cols.contains(c))
175                .map(|c| (c.clone(), [c.clone()].into()))
176                .collect()
177        } else {
178            original_columns
179                .iter()
180                .map(|c| (c.clone(), [c.clone()].into()))
181                .collect()
182        }
183    };
184
185    if initial_aliases.is_empty() {
186        return Ok(BTreeMap::new());
187    }
188
189    aliased_columns_for_inner(initial_aliases, aliased_node, original_node)
190}
191
192fn aliased_columns_for_inner(
193    cur_aliases: BTreeMap<Column, BTreeSet<Column>>,
194    node: &LogicalPlan,
195    original_node: Option<&LogicalPlan>,
196) -> DfResult<BTreeMap<Column, BTreeSet<Column>>> {
197    // First, collect the path from current node to the target node
198    let mut path = Vec::new();
199    let mut current_node = node;
200
201    // Descend to the target node, collecting nodes along the way
202    loop {
203        // Base case: check if we've reached the target node
204        if let Some(original_node) = original_node
205            && *current_node == *original_node
206        {
207            break;
208        } else if current_node.inputs().is_empty() {
209            // leaf node reached
210            break;
211        }
212
213        // Validate node has exactly one child
214        if current_node.inputs().len() != 1 {
215            return Err(datafusion::error::DataFusionError::Internal(format!(
216                "only accept plan with at most one child, found: {}",
217                current_node
218            )));
219        }
220
221        // Add current node to path and move to child
222        path.push(current_node);
223        current_node = current_node.inputs()[0];
224    }
225
226    // Now apply alias layers in reverse order (from original to aliased)
227    let mut result = cur_aliases;
228    for &node_in_path in path.iter().rev() {
229        let layer = get_alias_layer_from_node(node_in_path)?;
230        let mut new_aliases = BTreeMap::new();
231        for (original_column, cur_alias_set) in result {
232            let mut new_alias_set = BTreeSet::new();
233            for cur_alias in cur_alias_set {
234                new_alias_set.extend(layer.get_new_from_old(cur_alias.clone()));
235            }
236            if !new_alias_set.is_empty() {
237                new_aliases.insert(original_column, new_alias_set);
238            }
239        }
240        result = new_aliases;
241    }
242
243    Ok(result)
244}
245
246/// Return a mapping of original column to all the aliased columns in current node of the plan
247/// TODO(discord9): also support merge scan node
248fn get_alias_layer_from_node(node: &LogicalPlan) -> DfResult<AliasLayer> {
249    match node {
250        LogicalPlan::Projection(proj) => Ok(get_alias_layer_from_exprs(&proj.expr)),
251        LogicalPlan::Aggregate(aggr) => Ok(get_alias_layer_from_exprs(&aggr.group_expr)),
252        LogicalPlan::SubqueryAlias(subquery_alias) => {
253            let mut layer = AliasLayer::default();
254            let old_columns = subquery_alias.input.schema().columns();
255            for old_column in old_columns {
256                let new_column = Column::new(
257                    Some(subquery_alias.alias.clone()),
258                    old_column.name().to_string(),
259                );
260                // mapping from old_column to new_column
261                layer.insert_alias(old_column, [new_column].into());
262            }
263            Ok(layer)
264        }
265        LogicalPlan::TableScan(scan) => {
266            let columns = scan.projected_schema.columns();
267            let mut layer = AliasLayer::default();
268            for col in columns {
269                layer.insert_alias(col.clone(), [col.clone()].into());
270            }
271            Ok(layer)
272        }
273        _ => {
274            let input_schema = node
275                .inputs()
276                .first()
277                .ok_or_else(|| {
278                    datafusion::error::DataFusionError::Internal(format!(
279                        "only accept plan with at most one child, found: {}",
280                        node
281                    ))
282                })?
283                .schema();
284            let output_schema = node.schema();
285            // only accept at most one child plan, and if not one of the above nodes,
286            // also shouldn't modify the schema or else alias scope tracker can't support them
287            if node.inputs().len() > 1 {
288                Err(datafusion::error::DataFusionError::Internal(format!(
289                    "only accept plan with at most one child, found: {}",
290                    node
291                )))
292            } else if node.inputs().len() == 1 {
293                if input_schema != output_schema {
294                    let input_columns = input_schema.columns();
295                    let all_input_is_in_output = input_columns
296                        .iter()
297                        .all(|c| output_schema.is_column_from_schema(c));
298                    if all_input_is_in_output {
299                        // all input is in output, so it's just adding some columns, we can do identity mapping for input columns
300                        let mut layer = AliasLayer::default();
301                        for col in input_columns {
302                            layer.insert_alias(col.clone(), [col.clone()].into());
303                        }
304                        Ok(layer)
305                    } else {
306                        // otherwise use the intersection of input and output
307                        // TODO(discord9): maybe just make this case unsupported for now?
308                        common_telemetry::debug!(
309                            "Might be unsupported plan for alias tracking, track alias anyway: {}",
310                            node
311                        );
312                        let input_columns = input_schema.columns();
313                        let output_columns =
314                            output_schema.columns().into_iter().collect::<HashSet<_>>();
315                        let common_columns: HashSet<Column> = input_columns
316                            .iter()
317                            .filter(|c| output_columns.contains(c))
318                            .cloned()
319                            .collect();
320
321                        let mut layer = AliasLayer::default();
322                        for col in &common_columns {
323                            layer.insert_alias(col.clone(), [col.clone()].into());
324                        }
325                        Ok(layer)
326                    }
327                } else {
328                    // identity mapping
329                    let mut layer = AliasLayer::default();
330                    for col in output_schema.columns() {
331                        layer.insert_alias(col.clone(), [col.clone()].into());
332                    }
333                    Ok(layer)
334                }
335            } else {
336                // unknown plan with no input, error msg
337                Err(datafusion::error::DataFusionError::Internal(format!(
338                    "Unsupported plan with no input: {}",
339                    node
340                )))
341            }
342        }
343    }
344}
345
346fn get_alias_layer_from_exprs(exprs: &[Expr]) -> AliasLayer {
347    let mut alias_mapping: HashMap<Column, HashSet<Column>> = HashMap::new();
348    for expr in exprs {
349        if let Expr::Alias(alias) = expr {
350            if let Some(column) = get_alias_original_column(alias) {
351                alias_mapping
352                    .entry(column.clone())
353                    .or_default()
354                    .insert(Column::new(alias.relation.clone(), alias.name.clone()));
355            }
356        } else if let Expr::Column(column) = expr {
357            // identity mapping
358            alias_mapping
359                .entry(column.clone())
360                .or_default()
361                .insert(column.clone());
362        }
363    }
364    let mut layer = AliasLayer::default();
365    for (old_column, new_columns) in alias_mapping {
366        layer.insert_alias(old_column, new_columns);
367    }
368    layer
369}
370
371#[derive(Default, Debug, Clone)]
372struct AliasLayer {
373    /// for convenient of querying, key is field's name
374    old_to_new: BTreeMap<Column, HashSet<Column>>,
375}
376
377impl AliasLayer {
378    pub fn insert_alias(&mut self, old_column: Column, new_columns: HashSet<Column>) {
379        self.old_to_new
380            .entry(old_column)
381            .or_default()
382            .extend(new_columns);
383    }
384
385    pub fn get_new_from_old(&self, old_column: Column) -> HashSet<Column> {
386        let mut res_cols = HashSet::new();
387        for (old, new_cols) in self.old_to_new.iter() {
388            if old.name() == old_column.name() {
389                match (&old.relation, &old_column.relation) {
390                    (Some(o), Some(c)) => {
391                        if o.resolved_eq(c) {
392                            res_cols.extend(new_cols.clone());
393                        }
394                    }
395                    _ => {
396                        // if any of the two relation is None, meaning not fully qualified, just match name
397                        res_cols.extend(new_cols.clone());
398                    }
399                }
400            }
401        }
402        res_cols
403    }
404
405    pub fn get_old_from_new(&self, new_column: Column) -> Option<Column> {
406        for (old, new_set) in &self.old_to_new {
407            if new_set.iter().any(|n| {
408                if n.name() != new_column.name() {
409                    return false;
410                }
411                match (&n.relation, &new_column.relation) {
412                    (Some(r1), Some(r2)) => r1.resolved_eq(r2),
413                    _ => true,
414                }
415            }) {
416                return Some(old.clone());
417            }
418        }
419        None
420    }
421}
422
423fn get_alias_original_column(alias: &Alias) -> Option<Column> {
424    let mut cur_alias = alias;
425    while let Expr::Alias(inner_alias) = cur_alias.expr.as_ref() {
426        cur_alias = inner_alias;
427    }
428    if let Expr::Column(column) = cur_alias.expr.as_ref() {
429        return Some(column.clone());
430    }
431
432    None
433}
434
435/// Mapping of original column in table to all the alias at current node
436pub type AliasMapping = BTreeMap<String, BTreeSet<Column>>;
437
438#[cfg(test)]
439mod tests {
440    use std::sync::Arc;
441
442    use common_telemetry::init_default_ut_logging;
443    use datafusion::datasource::DefaultTableSource;
444    use datafusion::functions_aggregate::min_max::{max, min};
445    use datafusion_expr::{LogicalPlanBuilder, col};
446    use pretty_assertions::assert_eq;
447    use table::table::adapter::DfTableProviderAdapter;
448
449    use super::*;
450    use crate::dist_plan::analyzer::test::TestTable;
451
452    fn qcol(name: &str) -> Column {
453        Column::from_qualified_name(name)
454    }
455
456    #[test]
457    fn proj_multi_layered_alias_tracker() {
458        // use logging for better debugging
459        init_default_ut_logging();
460        let test_table = TestTable::table_with_name(0, "t".to_string());
461        let table_source = Arc::new(DefaultTableSource::new(Arc::new(
462            DfTableProviderAdapter::new(test_table),
463        )));
464        let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
465            .unwrap()
466            .project(vec![
467                col("number"),
468                col("pk3").alias("pk1"),
469                col("pk3").alias("pk2"),
470            ])
471            .unwrap()
472            .project(vec![
473                col("number"),
474                col("pk2").alias("pk4"),
475                col("pk1").alias("pk5"),
476            ])
477            .unwrap()
478            .build()
479            .unwrap();
480
481        let child = plan.inputs()[0].clone();
482
483        assert_eq!(
484            aliased_columns_for(&[qcol("pk1"), qcol("pk2")].into(), &plan, Some(&child)).unwrap(),
485            [
486                (qcol("pk1"), [qcol("pk5")].into()),
487                (qcol("pk2"), [qcol("pk4")].into())
488            ]
489            .into()
490        );
491
492        // columns not in the plan should return empty mapping
493        assert_eq!(
494            aliased_columns_for(&[qcol("pk1"), qcol("pk2")].into(), &plan, Some(&plan)).unwrap(),
495            [].into()
496        );
497
498        assert_eq!(
499            aliased_columns_for(&[qcol("t.pk3")].into(), &plan, Some(&child)).unwrap(),
500            [].into()
501        );
502
503        assert_eq!(
504            original_column_for(&[qcol("pk5"), qcol("pk4")].into(), plan.clone(), None).unwrap(),
505            [(qcol("pk5"), qcol("t.pk3")), (qcol("pk4"), qcol("t.pk3"))].into()
506        );
507
508        assert_eq!(
509            aliased_columns_for(&[qcol("pk3")].into(), &plan, None).unwrap(),
510            [(qcol("pk3"), [qcol("pk5"), qcol("pk4")].into())].into()
511        );
512        assert_eq!(
513            original_column_for(&[qcol("pk1"), qcol("pk2")].into(), child.clone(), None).unwrap(),
514            [(qcol("pk1"), qcol("t.pk3")), (qcol("pk2"), qcol("t.pk3"))].into()
515        );
516
517        assert_eq!(
518            aliased_columns_for(&[qcol("pk3")].into(), &child, None).unwrap(),
519            [(qcol("pk3"), [qcol("pk1"), qcol("pk2")].into())].into()
520        );
521
522        assert_eq!(
523            original_column_for(
524                &[qcol("pk4"), qcol("pk5")].into(),
525                plan.clone(),
526                Some(Arc::new(child.clone()))
527            )
528            .unwrap(),
529            [(qcol("pk4"), qcol("pk2")), (qcol("pk5"), qcol("pk1"))].into()
530        );
531    }
532
533    #[test]
534    fn sort_subquery_alias_layered_tracker() {
535        let test_table = TestTable::table_with_name(0, "t".to_string());
536        let table_source = Arc::new(DefaultTableSource::new(Arc::new(
537            DfTableProviderAdapter::new(test_table),
538        )));
539
540        let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
541            .unwrap()
542            .sort(vec![col("t.number").sort(true, false)])
543            .unwrap()
544            .alias("a")
545            .unwrap()
546            .build()
547            .unwrap();
548
549        let sort_plan = plan.inputs()[0].clone();
550        let scan_plan = sort_plan.inputs()[0].clone();
551
552        // Test aliased_columns_for from scan to final plan
553        assert_eq!(
554            aliased_columns_for(&[qcol("t.number")].into(), &plan, Some(&scan_plan)).unwrap(),
555            [(qcol("t.number"), [qcol("a.number")].into())].into()
556        );
557
558        // Test aliased_columns_for from sort to final plan
559        assert_eq!(
560            aliased_columns_for(&[qcol("t.number")].into(), &plan, Some(&sort_plan)).unwrap(),
561            [(qcol("t.number"), [qcol("a.number")].into())].into()
562        );
563
564        // Test aliased_columns_for from leaf to final plan
565        assert_eq!(
566            aliased_columns_for(&[qcol("t.number")].into(), &plan, None).unwrap(),
567            [(qcol("t.number"), [qcol("a.number")].into())].into()
568        );
569
570        // Test original_column_for from final plan to scan
571        assert_eq!(
572            original_column_for(
573                &[qcol("a.number")].into(),
574                plan.clone(),
575                Some(Arc::new(scan_plan.clone()))
576            )
577            .unwrap(),
578            [(qcol("a.number"), qcol("t.number"))].into()
579        );
580
581        // Test original_column_for from final plan to sort
582        assert_eq!(
583            original_column_for(
584                &[qcol("a.number")].into(),
585                plan.clone(),
586                Some(Arc::new(sort_plan.clone()))
587            )
588            .unwrap(),
589            [(qcol("a.number"), qcol("t.number"))].into()
590        );
591    }
592
593    #[test]
594    fn proj_alias_layered_tracker() {
595        // use logging for better debugging
596        init_default_ut_logging();
597        let test_table = TestTable::table_with_name(0, "t".to_string());
598        let table_source = Arc::new(DefaultTableSource::new(Arc::new(
599            DfTableProviderAdapter::new(test_table),
600        )));
601        let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
602            .unwrap()
603            .project(vec![
604                col("number"),
605                col("pk3").alias("pk1"),
606                col("pk2").alias("pk3"),
607            ])
608            .unwrap()
609            .project(vec![
610                col("number"),
611                col("pk1").alias("pk2"),
612                col("pk3").alias("pk1"),
613            ])
614            .unwrap()
615            .build()
616            .unwrap();
617
618        let first_proj = plan.inputs()[0].clone();
619        let scan_plan = first_proj.inputs()[0].clone();
620
621        // Test original_column_for from final plan to scan
622        assert_eq!(
623            original_column_for(
624                &[qcol("pk1")].into(),
625                plan.clone(),
626                Some(Arc::new(scan_plan.clone()))
627            )
628            .unwrap(),
629            [(qcol("pk1"), qcol("t.pk2"))].into()
630        );
631
632        // Test original_column_for from final plan to first projection
633        assert_eq!(
634            original_column_for(
635                &[qcol("pk1")].into(),
636                plan.clone(),
637                Some(Arc::new(first_proj.clone()))
638            )
639            .unwrap(),
640            [(qcol("pk1"), qcol("pk3"))].into()
641        );
642
643        // Test original_column_for from final plan to leaf
644        assert_eq!(
645            original_column_for(
646                &[qcol("pk1")].into(),
647                plan.clone(),
648                Some(Arc::new(plan.clone()))
649            )
650            .unwrap(),
651            [(qcol("pk1"), qcol("pk1"))].into()
652        );
653
654        // Test aliased_columns_for from scan to first projection
655        assert_eq!(
656            aliased_columns_for(&[qcol("t.pk2")].into(), &first_proj, Some(&scan_plan)).unwrap(),
657            [(qcol("t.pk2"), [qcol("pk3")].into())].into()
658        );
659
660        // Test aliased_columns_for from first projection to final plan
661        assert_eq!(
662            aliased_columns_for(&[qcol("pk3")].into(), &plan, Some(&first_proj)).unwrap(),
663            [(qcol("pk3"), [qcol("pk1")].into())].into()
664        );
665
666        // Test aliased_columns_for from scan to final plan
667        assert_eq!(
668            aliased_columns_for(&[qcol("t.pk2")].into(), &plan, Some(&scan_plan)).unwrap(),
669            [(qcol("t.pk2"), [qcol("pk1")].into())].into()
670        );
671
672        // Test aliased_columns_for from leaf to final plan
673        assert_eq!(
674            aliased_columns_for(&[qcol("pk2")].into(), &plan, None).unwrap(),
675            [(qcol("pk2"), [qcol("pk1")].into())].into()
676        );
677    }
678
679    #[test]
680    fn proj_alias_relation_layered_tracker() {
681        // use logging for better debugging
682        init_default_ut_logging();
683        let test_table = TestTable::table_with_name(0, "t".to_string());
684        let table_source = Arc::new(DefaultTableSource::new(Arc::new(
685            DfTableProviderAdapter::new(test_table),
686        )));
687        let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
688            .unwrap()
689            .project(vec![
690                col("number"),
691                col("pk3").alias_qualified(Some("b"), "pk1"),
692                col("pk2").alias_qualified(Some("a"), "pk1"),
693            ])
694            .unwrap()
695            .build()
696            .unwrap();
697
698        let scan_plan = plan.inputs()[0].clone();
699
700        // Test aliased_columns_for from scan to projection
701        assert_eq!(
702            aliased_columns_for(&[qcol("t.pk2")].into(), &plan, Some(&scan_plan)).unwrap(),
703            [(qcol("t.pk2"), [qcol("a.pk1")].into())].into()
704        );
705    }
706
707    #[test]
708    fn proj_alias_aliased_aggr() {
709        // use logging for better debugging
710        init_default_ut_logging();
711        let test_table = TestTable::table_with_name(0, "t".to_string());
712        let table_source = Arc::new(DefaultTableSource::new(Arc::new(
713            DfTableProviderAdapter::new(test_table),
714        )));
715        let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
716            .unwrap()
717            .project(vec![
718                col("number"),
719                col("pk1").alias("pk3"),
720                col("pk2").alias("pk4"),
721            ])
722            .unwrap()
723            .project(vec![
724                col("number"),
725                col("pk3").alias("pk42"),
726                col("pk4").alias("pk43"),
727            ])
728            .unwrap()
729            .aggregate(vec![col("pk42"), col("pk43")], vec![min(col("number"))])
730            .unwrap()
731            .build()
732            .unwrap();
733
734        let aggr_plan = plan.clone();
735        let second_proj = aggr_plan.inputs()[0].clone();
736        let first_proj = second_proj.inputs()[0].clone();
737        let scan_plan = first_proj.inputs()[0].clone();
738
739        // Test aliased_columns_for from scan to final plan
740        assert_eq!(
741            aliased_columns_for(&[qcol("t.pk1")].into(), &plan, Some(&scan_plan)).unwrap(),
742            [(qcol("t.pk1"), [qcol("pk42")].into())].into()
743        );
744
745        // Test aliased_columns_for from scan to first projection
746        assert_eq!(
747            aliased_columns_for(&[Column::from_name("pk1")].into(), &first_proj, None).unwrap(),
748            [(Column::from_name("pk1"), [qcol("pk3")].into())].into()
749        );
750    }
751
752    #[test]
753    fn aggr_aggr_alias() {
754        // use logging for better debugging
755        init_default_ut_logging();
756        let test_table = TestTable::table_with_name(0, "t".to_string());
757        let table_source = Arc::new(DefaultTableSource::new(Arc::new(
758            DfTableProviderAdapter::new(test_table),
759        )));
760        let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
761            .unwrap()
762            .aggregate(vec![col("pk1"), col("pk2")], vec![max(col("number"))])
763            .unwrap()
764            .aggregate(
765                vec![col("pk1"), col("pk2")],
766                vec![min(col("max(t.number)"))],
767            )
768            .unwrap()
769            .build()
770            .unwrap();
771
772        let second_aggr = plan.clone();
773        let first_aggr = second_aggr.inputs()[0].clone();
774        let scan_plan = first_aggr.inputs()[0].clone();
775
776        // Test aliased_columns_for from scan to final plan (identity mapping for aggregates)
777        assert_eq!(
778            aliased_columns_for(&[qcol("t.pk1")].into(), &plan, Some(&scan_plan)).unwrap(),
779            [(qcol("t.pk1"), [qcol("t.pk1")].into())].into()
780        );
781
782        // Test aliased_columns_for from scan to first aggregate
783        assert_eq!(
784            aliased_columns_for(&[qcol("t.pk1")].into(), &first_aggr, Some(&scan_plan)).unwrap(),
785            [(qcol("t.pk1"), [qcol("t.pk1")].into())].into()
786        );
787
788        // Test aliased_columns_for from first aggregate to final plan
789        assert_eq!(
790            aliased_columns_for(&[qcol("t.pk1")].into(), &plan, Some(&first_aggr)).unwrap(),
791            [(qcol("t.pk1"), [qcol("t.pk1")].into())].into()
792        );
793
794        // Test aliased_columns_for from leaf to final plan
795        assert_eq!(
796            aliased_columns_for(&[Column::from_name("pk1")].into(), &plan, None).unwrap(),
797            [(Column::from_name("pk1"), [qcol("t.pk1")].into())].into()
798        );
799    }
800
801    #[test]
802    fn aggr_aggr_alias_projection() {
803        // use logging for better debugging
804        init_default_ut_logging();
805        let test_table = TestTable::table_with_name(0, "t".to_string());
806        let table_source = Arc::new(DefaultTableSource::new(Arc::new(
807            DfTableProviderAdapter::new(test_table),
808        )));
809        let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
810            .unwrap()
811            .aggregate(vec![col("pk1"), col("pk2")], vec![max(col("number"))])
812            .unwrap()
813            .aggregate(
814                vec![col("pk1"), col("pk2")],
815                vec![min(col("max(t.number)"))],
816            )
817            .unwrap()
818            .project(vec![
819                col("pk1").alias("pk11"),
820                col("pk2").alias("pk22"),
821                col("min(max(t.number))").alias("min_max_number"),
822            ])
823            .unwrap()
824            .build()
825            .unwrap();
826
827        let proj_plan = plan.clone();
828        let second_aggr = proj_plan.inputs()[0].clone();
829
830        // Test original_column_for from projection to second aggregate for aggr gen column
831        assert_eq!(
832            original_column_for(
833                &[Column::from_name("min_max_number")].into(),
834                plan.clone(),
835                Some(Arc::new(second_aggr.clone()))
836            )
837            .unwrap(),
838            [(
839                Column::from_name("min_max_number"),
840                Column::from_name("min(max(t.number))")
841            )]
842            .into()
843        );
844
845        // Test aliased_columns_for from second aggregate to projection
846        assert_eq!(
847            aliased_columns_for(
848                &[Column::from_name("min(max(t.number))")].into(),
849                &plan,
850                Some(&second_aggr)
851            )
852            .unwrap(),
853            [(
854                Column::from_name("min(max(t.number))"),
855                [Column::from_name("min_max_number")].into()
856            )]
857            .into()
858        );
859    }
860}