query/dist_plan/analyzer/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17
18use datafusion::error::Result as DfResult;
19use datafusion_common::Column;
20use datafusion_expr::expr::Alias;
21use datafusion_expr::{Expr, LogicalPlan};
22
23/// Return all the original columns(at original node) for the given aliased columns at the aliased node
24///
25/// if `original_node` is None, it means original columns are from leaf node
26#[allow(unused)]
27pub fn original_column_for(
28    aliased_columns: &HashSet<Column>,
29    aliased_node: LogicalPlan,
30    original_node: Option<Arc<LogicalPlan>>,
31) -> DfResult<HashMap<Column, Column>> {
32    let schema_cols: HashSet<Column> = aliased_node.schema().columns().iter().cloned().collect();
33    let cur_aliases: HashMap<Column, Column> = aliased_columns
34        .iter()
35        .filter(|c| schema_cols.contains(c))
36        .map(|c| (c.clone(), c.clone()))
37        .collect();
38
39    if cur_aliases.is_empty() {
40        return Ok(HashMap::new());
41    }
42
43    original_column_for_inner(cur_aliases, &aliased_node, &original_node)
44}
45
46fn original_column_for_inner(
47    mut cur_aliases: HashMap<Column, Column>,
48    node: &LogicalPlan,
49    original_node: &Option<Arc<LogicalPlan>>,
50) -> DfResult<HashMap<Column, Column>> {
51    let mut current_node = node;
52
53    loop {
54        // Base case: check if we've reached the target node
55        if let Some(original_node) = original_node {
56            if *current_node == **original_node {
57                return Ok(cur_aliases);
58            }
59        } else if current_node.inputs().is_empty() {
60            // leaf node reached
61            return Ok(cur_aliases);
62        }
63
64        // Validate node has exactly one child
65        if current_node.inputs().len() != 1 {
66            return Err(datafusion::error::DataFusionError::Internal(
67                "only accept plan with at most one child".to_string(),
68            ));
69        }
70
71        // Get alias layer and update aliases
72        let layer = get_alias_layer_from_node(current_node)?;
73        let mut new_aliases = HashMap::new();
74        for (start_alias, cur_alias) in cur_aliases {
75            if let Some(old_column) = layer.get_old_from_new(cur_alias.clone()) {
76                new_aliases.insert(start_alias, old_column);
77            }
78        }
79
80        // Move to child node and continue iteration
81        cur_aliases = new_aliases;
82        current_node = current_node.inputs()[0];
83    }
84}
85
86/// Return all the aliased columns(at aliased node) for the given original columns(at original node)
87///
88/// if `original_node` is None, it means original columns are from leaf node
89pub fn aliased_columns_for(
90    original_columns: &HashSet<Column>,
91    aliased_node: &LogicalPlan,
92    original_node: Option<&LogicalPlan>,
93) -> DfResult<HashMap<Column, HashSet<Column>>> {
94    let initial_aliases: HashMap<Column, HashSet<Column>> = {
95        if let Some(original) = &original_node {
96            let schema_cols: HashSet<Column> = original.schema().columns().into_iter().collect();
97            original_columns
98                .iter()
99                .filter(|c| schema_cols.contains(c))
100                .map(|c| (c.clone(), HashSet::from([c.clone()])))
101                .collect()
102        } else {
103            original_columns
104                .iter()
105                .map(|c| (c.clone(), HashSet::from([c.clone()])))
106                .collect()
107        }
108    };
109
110    if initial_aliases.is_empty() {
111        return Ok(HashMap::new());
112    }
113
114    aliased_columns_for_inner(initial_aliases, aliased_node, original_node)
115}
116
117fn aliased_columns_for_inner(
118    cur_aliases: HashMap<Column, HashSet<Column>>,
119    node: &LogicalPlan,
120    original_node: Option<&LogicalPlan>,
121) -> DfResult<HashMap<Column, HashSet<Column>>> {
122    // First, collect the path from current node to the target node
123    let mut path = Vec::new();
124    let mut current_node = node;
125
126    // Descend to the target node, collecting nodes along the way
127    loop {
128        // Base case: check if we've reached the target node
129        if let Some(original_node) = original_node {
130            if *current_node == *original_node {
131                break;
132            }
133        } else if current_node.inputs().is_empty() {
134            // leaf node reached
135            break;
136        }
137
138        // Validate node has exactly one child
139        if current_node.inputs().len() != 1 {
140            return Err(datafusion::error::DataFusionError::Internal(
141                "only accept plan with at most one child".to_string(),
142            ));
143        }
144
145        // Add current node to path and move to child
146        path.push(current_node);
147        current_node = current_node.inputs()[0];
148    }
149
150    // Now apply alias layers in reverse order (from original to aliased)
151    let mut result = cur_aliases;
152    for &node_in_path in path.iter().rev() {
153        let layer = get_alias_layer_from_node(node_in_path)?;
154        let mut new_aliases = HashMap::new();
155        for (original_column, cur_alias_set) in result {
156            let mut new_alias_set = HashSet::new();
157            for cur_alias in cur_alias_set {
158                new_alias_set.extend(layer.get_new_from_old(cur_alias.clone()));
159            }
160            if !new_alias_set.is_empty() {
161                new_aliases.insert(original_column, new_alias_set);
162            }
163        }
164        result = new_aliases;
165    }
166
167    Ok(result)
168}
169
170/// Return a mapping of original column to all the aliased columns in current node of the plan
171fn get_alias_layer_from_node(node: &LogicalPlan) -> DfResult<AliasLayer> {
172    match node {
173        LogicalPlan::Projection(proj) => Ok(get_alias_layer_from_exprs(&proj.expr)),
174        LogicalPlan::Aggregate(aggr) => Ok(get_alias_layer_from_exprs(&aggr.group_expr)),
175        LogicalPlan::SubqueryAlias(subquery_alias) => {
176            let mut layer = AliasLayer::default();
177            let old_columns = subquery_alias.input.schema().columns();
178            for old_column in old_columns {
179                let new_column = Column::new(
180                    Some(subquery_alias.alias.clone()),
181                    old_column.name().to_string(),
182                );
183                // mapping from old_column to new_column
184                layer.insert_alias(old_column, HashSet::from([new_column]));
185            }
186            Ok(layer)
187        }
188        LogicalPlan::TableScan(scan) => {
189            let columns = scan.projected_schema.columns();
190            let mut layer = AliasLayer::default();
191            for col in columns {
192                layer.insert_alias(col.clone(), HashSet::from([col.clone()]));
193            }
194            Ok(layer)
195        }
196        _ => {
197            let input_schema = node
198                .inputs()
199                .first()
200                .ok_or_else(|| {
201                    datafusion::error::DataFusionError::Internal(
202                        "only accept plan with at most one child".to_string(),
203                    )
204                })?
205                .schema();
206            let output_schema = node.schema();
207            // only accept at most one child plan, and if not one of the above nodes,
208            // also shouldn't modify the schema or else alias scope tracker can't support them
209            if node.inputs().len() > 1 {
210                Err(datafusion::error::DataFusionError::Internal(format!(
211                    "only accept plan with at most one child, found: {}",
212                    node
213                )))
214            } else if node.inputs().len() == 1 {
215                if input_schema != output_schema {
216                    let input_columns = input_schema.columns();
217                    let all_input_is_in_output = input_columns
218                        .iter()
219                        .all(|c| output_schema.is_column_from_schema(c));
220                    if all_input_is_in_output {
221                        // all input is in output, so it's just adding some columns, we can do identity mapping for input columns
222                        let mut layer = AliasLayer::default();
223                        for col in input_columns {
224                            layer.insert_alias(col.clone(), HashSet::from([col.clone()]));
225                        }
226                        Ok(layer)
227                    } else {
228                        // otherwise use the intersection of input and output
229                        // TODO(discord9): maybe just make this case unsupported for now?
230                        common_telemetry::warn!(
231                            "Might be unsupported plan for alias tracking, track alias anyway: {}",
232                            node
233                        );
234                        let input_columns = input_schema.columns();
235                        let output_columns =
236                            output_schema.columns().into_iter().collect::<HashSet<_>>();
237                        let common_columns: HashSet<Column> = input_columns
238                            .iter()
239                            .filter(|c| output_columns.contains(c))
240                            .cloned()
241                            .collect();
242
243                        let mut layer = AliasLayer::default();
244                        for col in &common_columns {
245                            layer.insert_alias(col.clone(), HashSet::from([col.clone()]));
246                        }
247                        Ok(layer)
248                    }
249                } else {
250                    // identity mapping
251                    let mut layer = AliasLayer::default();
252                    for col in output_schema.columns() {
253                        layer.insert_alias(col.clone(), HashSet::from([col.clone()]));
254                    }
255                    Ok(layer)
256                }
257            } else {
258                // unknown plan with no input, error msg
259                Err(datafusion::error::DataFusionError::Internal(format!(
260                    "Unsupported plan with no input: {}",
261                    node
262                )))
263            }
264        }
265    }
266}
267
268fn get_alias_layer_from_exprs(exprs: &[Expr]) -> AliasLayer {
269    let mut alias_mapping: HashMap<Column, HashSet<Column>> = HashMap::new();
270    for expr in exprs {
271        if let Expr::Alias(alias) = expr {
272            if let Some(column) = get_alias_original_column(alias) {
273                alias_mapping
274                    .entry(column.clone())
275                    .or_default()
276                    .insert(Column::new(alias.relation.clone(), alias.name.clone()));
277            }
278        } else if let Expr::Column(column) = expr {
279            // identity mapping
280            alias_mapping
281                .entry(column.clone())
282                .or_default()
283                .insert(column.clone());
284        }
285    }
286    let mut layer = AliasLayer::default();
287    for (old_column, new_columns) in alias_mapping {
288        layer.insert_alias(old_column, new_columns);
289    }
290    layer
291}
292
293#[derive(Default, Debug, Clone)]
294struct AliasLayer {
295    /// for convenient of querying, key is field's name
296    old_to_new: BTreeMap<Column, HashSet<Column>>,
297}
298
299impl AliasLayer {
300    pub fn insert_alias(&mut self, old_column: Column, new_columns: HashSet<Column>) {
301        self.old_to_new
302            .entry(old_column)
303            .or_default()
304            .extend(new_columns);
305    }
306
307    pub fn get_new_from_old(&self, old_column: Column) -> HashSet<Column> {
308        let mut res_cols = HashSet::new();
309        for (old, new_cols) in self.old_to_new.iter() {
310            if old.name() == old_column.name() {
311                match (&old.relation, &old_column.relation) {
312                    (Some(o), Some(c)) => {
313                        if o.resolved_eq(c) {
314                            res_cols.extend(new_cols.clone());
315                        }
316                    }
317                    _ => {
318                        // if any of the two relation is None, meaning not fully qualified, just match name
319                        res_cols.extend(new_cols.clone());
320                    }
321                }
322            }
323        }
324        res_cols
325    }
326
327    pub fn get_old_from_new(&self, new_column: Column) -> Option<Column> {
328        for (old, new_set) in &self.old_to_new {
329            if new_set.iter().any(|n| {
330                if n.name() != new_column.name() {
331                    return false;
332                }
333                match (&n.relation, &new_column.relation) {
334                    (Some(r1), Some(r2)) => r1.resolved_eq(r2),
335                    _ => true,
336                }
337            }) {
338                return Some(old.clone());
339            }
340        }
341        None
342    }
343}
344
345fn get_alias_original_column(alias: &Alias) -> Option<Column> {
346    let mut cur_alias = alias;
347    while let Expr::Alias(inner_alias) = cur_alias.expr.as_ref() {
348        cur_alias = inner_alias;
349    }
350    if let Expr::Column(column) = cur_alias.expr.as_ref() {
351        return Some(column.clone());
352    }
353
354    None
355}
356
357/// Mapping of original column in table to all the alias at current node
358pub type AliasMapping = BTreeMap<String, BTreeSet<Column>>;
359
360#[cfg(test)]
361mod tests {
362    use std::sync::Arc;
363
364    use common_telemetry::init_default_ut_logging;
365    use datafusion::datasource::DefaultTableSource;
366    use datafusion::functions_aggregate::min_max::{max, min};
367    use datafusion_expr::{LogicalPlanBuilder, col};
368    use pretty_assertions::assert_eq;
369    use table::table::adapter::DfTableProviderAdapter;
370
371    use super::*;
372    use crate::dist_plan::analyzer::test::TestTable;
373
374    fn qcol(name: &str) -> Column {
375        Column::from_qualified_name(name)
376    }
377
378    #[test]
379    fn proj_multi_layered_alias_tracker() {
380        // use logging for better debugging
381        init_default_ut_logging();
382        let test_table = TestTable::table_with_name(0, "t".to_string());
383        let table_source = Arc::new(DefaultTableSource::new(Arc::new(
384            DfTableProviderAdapter::new(test_table),
385        )));
386        let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
387            .unwrap()
388            .project(vec![
389                col("number"),
390                col("pk3").alias("pk1"),
391                col("pk3").alias("pk2"),
392            ])
393            .unwrap()
394            .project(vec![
395                col("number"),
396                col("pk2").alias("pk4"),
397                col("pk1").alias("pk5"),
398            ])
399            .unwrap()
400            .build()
401            .unwrap();
402
403        let child = plan.inputs()[0].clone();
404
405        assert_eq!(
406            aliased_columns_for(
407                &HashSet::from([qcol("pk1"), qcol("pk2")]),
408                &plan,
409                Some(&child)
410            )
411            .unwrap(),
412            HashMap::from([
413                (qcol("pk1"), HashSet::from([qcol("pk5")])),
414                (qcol("pk2"), HashSet::from([qcol("pk4")]))
415            ])
416        );
417
418        // columns not in the plan should return empty mapping
419        assert_eq!(
420            aliased_columns_for(
421                &HashSet::from([qcol("pk1"), qcol("pk2")]),
422                &plan,
423                Some(&plan)
424            )
425            .unwrap(),
426            HashMap::from([])
427        );
428
429        assert_eq!(
430            aliased_columns_for(&HashSet::from([qcol("t.pk3")]), &plan, Some(&child)).unwrap(),
431            HashMap::from([])
432        );
433
434        assert_eq!(
435            original_column_for(
436                &HashSet::from([qcol("pk5"), qcol("pk4")]),
437                plan.clone(),
438                None
439            )
440            .unwrap(),
441            HashMap::from([(qcol("pk5"), qcol("t.pk3")), (qcol("pk4"), qcol("t.pk3"))])
442        );
443
444        assert_eq!(
445            aliased_columns_for(&HashSet::from([qcol("pk3")]), &plan, None).unwrap(),
446            HashMap::from([(qcol("pk3"), HashSet::from([qcol("pk5"), qcol("pk4")]))])
447        );
448        assert_eq!(
449            original_column_for(
450                &HashSet::from([qcol("pk1"), qcol("pk2")]),
451                child.clone(),
452                None
453            )
454            .unwrap(),
455            HashMap::from([(qcol("pk1"), qcol("t.pk3")), (qcol("pk2"), qcol("t.pk3"))])
456        );
457
458        assert_eq!(
459            aliased_columns_for(&HashSet::from([qcol("pk3")]), &child, None).unwrap(),
460            HashMap::from([(qcol("pk3"), HashSet::from([qcol("pk1"), qcol("pk2")]))])
461        );
462
463        assert_eq!(
464            original_column_for(
465                &HashSet::from([qcol("pk4"), qcol("pk5")]),
466                plan.clone(),
467                Some(Arc::new(child.clone()))
468            )
469            .unwrap(),
470            HashMap::from([(qcol("pk4"), qcol("pk2")), (qcol("pk5"), qcol("pk1"))])
471        );
472    }
473
474    #[test]
475    fn sort_subquery_alias_layered_tracker() {
476        let test_table = TestTable::table_with_name(0, "t".to_string());
477        let table_source = Arc::new(DefaultTableSource::new(Arc::new(
478            DfTableProviderAdapter::new(test_table),
479        )));
480
481        let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
482            .unwrap()
483            .sort(vec![col("t.number").sort(true, false)])
484            .unwrap()
485            .alias("a")
486            .unwrap()
487            .build()
488            .unwrap();
489
490        let sort_plan = plan.inputs()[0].clone();
491        let scan_plan = sort_plan.inputs()[0].clone();
492
493        // Test aliased_columns_for from scan to final plan
494        assert_eq!(
495            aliased_columns_for(&HashSet::from([qcol("t.number")]), &plan, Some(&scan_plan))
496                .unwrap(),
497            HashMap::from([(qcol("t.number"), HashSet::from([qcol("a.number")]))])
498        );
499
500        // Test aliased_columns_for from sort to final plan
501        assert_eq!(
502            aliased_columns_for(&HashSet::from([qcol("t.number")]), &plan, Some(&sort_plan))
503                .unwrap(),
504            HashMap::from([(qcol("t.number"), HashSet::from([qcol("a.number")]))])
505        );
506
507        // Test aliased_columns_for from leaf to final plan
508        assert_eq!(
509            aliased_columns_for(&HashSet::from([qcol("t.number")]), &plan, None).unwrap(),
510            HashMap::from([(qcol("t.number"), HashSet::from([qcol("a.number")]))])
511        );
512
513        // Test original_column_for from final plan to scan
514        assert_eq!(
515            original_column_for(
516                &HashSet::from([qcol("a.number")]),
517                plan.clone(),
518                Some(Arc::new(scan_plan.clone()))
519            )
520            .unwrap(),
521            HashMap::from([(qcol("a.number"), qcol("t.number"))])
522        );
523
524        // Test original_column_for from final plan to sort
525        assert_eq!(
526            original_column_for(
527                &HashSet::from([qcol("a.number")]),
528                plan.clone(),
529                Some(Arc::new(sort_plan.clone()))
530            )
531            .unwrap(),
532            HashMap::from([(qcol("a.number"), qcol("t.number"))])
533        );
534    }
535
536    #[test]
537    fn proj_alias_layered_tracker() {
538        // use logging for better debugging
539        init_default_ut_logging();
540        let test_table = TestTable::table_with_name(0, "t".to_string());
541        let table_source = Arc::new(DefaultTableSource::new(Arc::new(
542            DfTableProviderAdapter::new(test_table),
543        )));
544        let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
545            .unwrap()
546            .project(vec![
547                col("number"),
548                col("pk3").alias("pk1"),
549                col("pk2").alias("pk3"),
550            ])
551            .unwrap()
552            .project(vec![
553                col("number"),
554                col("pk1").alias("pk2"),
555                col("pk3").alias("pk1"),
556            ])
557            .unwrap()
558            .build()
559            .unwrap();
560
561        let first_proj = plan.inputs()[0].clone();
562        let scan_plan = first_proj.inputs()[0].clone();
563
564        // Test original_column_for from final plan to scan
565        assert_eq!(
566            original_column_for(
567                &HashSet::from([qcol("pk1")]),
568                plan.clone(),
569                Some(Arc::new(scan_plan.clone()))
570            )
571            .unwrap(),
572            HashMap::from([(qcol("pk1"), qcol("t.pk2"))])
573        );
574
575        // Test original_column_for from final plan to first projection
576        assert_eq!(
577            original_column_for(
578                &HashSet::from([qcol("pk1")]),
579                plan.clone(),
580                Some(Arc::new(first_proj.clone()))
581            )
582            .unwrap(),
583            HashMap::from([(qcol("pk1"), qcol("pk3"))])
584        );
585
586        // Test original_column_for from final plan to leaf
587        assert_eq!(
588            original_column_for(
589                &HashSet::from([qcol("pk1")]),
590                plan.clone(),
591                Some(Arc::new(plan.clone()))
592            )
593            .unwrap(),
594            HashMap::from([(qcol("pk1"), qcol("pk1"))])
595        );
596
597        // Test aliased_columns_for from scan to first projection
598        assert_eq!(
599            aliased_columns_for(
600                &HashSet::from([qcol("t.pk2")]),
601                &first_proj,
602                Some(&scan_plan)
603            )
604            .unwrap(),
605            HashMap::from([(qcol("t.pk2"), HashSet::from([qcol("pk3")]))])
606        );
607
608        // Test aliased_columns_for from first projection to final plan
609        assert_eq!(
610            aliased_columns_for(&HashSet::from([qcol("pk3")]), &plan, Some(&first_proj)).unwrap(),
611            HashMap::from([(qcol("pk3"), HashSet::from([qcol("pk1")]))])
612        );
613
614        // Test aliased_columns_for from scan to final plan
615        assert_eq!(
616            aliased_columns_for(&HashSet::from([qcol("t.pk2")]), &plan, Some(&scan_plan)).unwrap(),
617            HashMap::from([(qcol("t.pk2"), HashSet::from([qcol("pk1")]))])
618        );
619
620        // Test aliased_columns_for from leaf to final plan
621        assert_eq!(
622            aliased_columns_for(&HashSet::from([qcol("pk2")]), &plan, None).unwrap(),
623            HashMap::from([(qcol("pk2"), HashSet::from([qcol("pk1")]))])
624        );
625    }
626
627    #[test]
628    fn proj_alias_relation_layered_tracker() {
629        // use logging for better debugging
630        init_default_ut_logging();
631        let test_table = TestTable::table_with_name(0, "t".to_string());
632        let table_source = Arc::new(DefaultTableSource::new(Arc::new(
633            DfTableProviderAdapter::new(test_table),
634        )));
635        let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
636            .unwrap()
637            .project(vec![
638                col("number"),
639                col("pk3").alias_qualified(Some("b"), "pk1"),
640                col("pk2").alias_qualified(Some("a"), "pk1"),
641            ])
642            .unwrap()
643            .build()
644            .unwrap();
645
646        let scan_plan = plan.inputs()[0].clone();
647
648        // Test aliased_columns_for from scan to projection
649        assert_eq!(
650            aliased_columns_for(&HashSet::from([qcol("t.pk2")]), &plan, Some(&scan_plan)).unwrap(),
651            HashMap::from([(qcol("t.pk2"), HashSet::from([qcol("a.pk1")]))])
652        );
653    }
654
655    #[test]
656    fn proj_alias_aliased_aggr() {
657        // use logging for better debugging
658        init_default_ut_logging();
659        let test_table = TestTable::table_with_name(0, "t".to_string());
660        let table_source = Arc::new(DefaultTableSource::new(Arc::new(
661            DfTableProviderAdapter::new(test_table),
662        )));
663        let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
664            .unwrap()
665            .project(vec![
666                col("number"),
667                col("pk1").alias("pk3"),
668                col("pk2").alias("pk4"),
669            ])
670            .unwrap()
671            .project(vec![
672                col("number"),
673                col("pk3").alias("pk42"),
674                col("pk4").alias("pk43"),
675            ])
676            .unwrap()
677            .aggregate(vec![col("pk42"), col("pk43")], vec![min(col("number"))])
678            .unwrap()
679            .build()
680            .unwrap();
681
682        let aggr_plan = plan.clone();
683        let second_proj = aggr_plan.inputs()[0].clone();
684        let first_proj = second_proj.inputs()[0].clone();
685        let scan_plan = first_proj.inputs()[0].clone();
686
687        // Test aliased_columns_for from scan to final plan
688        assert_eq!(
689            aliased_columns_for(&HashSet::from([qcol("t.pk1")]), &plan, Some(&scan_plan)).unwrap(),
690            HashMap::from([(qcol("t.pk1"), HashSet::from([qcol("pk42")]))])
691        );
692
693        // Test aliased_columns_for from scan to first projection
694        assert_eq!(
695            aliased_columns_for(
696                &HashSet::from([Column::from_name("pk1")]),
697                &first_proj,
698                None
699            )
700            .unwrap(),
701            HashMap::from([(Column::from_name("pk1"), HashSet::from([qcol("pk3")]))])
702        );
703    }
704
705    #[test]
706    fn aggr_aggr_alias() {
707        // use logging for better debugging
708        init_default_ut_logging();
709        let test_table = TestTable::table_with_name(0, "t".to_string());
710        let table_source = Arc::new(DefaultTableSource::new(Arc::new(
711            DfTableProviderAdapter::new(test_table),
712        )));
713        let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
714            .unwrap()
715            .aggregate(vec![col("pk1"), col("pk2")], vec![max(col("number"))])
716            .unwrap()
717            .aggregate(
718                vec![col("pk1"), col("pk2")],
719                vec![min(col("max(t.number)"))],
720            )
721            .unwrap()
722            .build()
723            .unwrap();
724
725        let second_aggr = plan.clone();
726        let first_aggr = second_aggr.inputs()[0].clone();
727        let scan_plan = first_aggr.inputs()[0].clone();
728
729        // Test aliased_columns_for from scan to final plan (identity mapping for aggregates)
730        assert_eq!(
731            aliased_columns_for(&HashSet::from([qcol("t.pk1")]), &plan, Some(&scan_plan)).unwrap(),
732            HashMap::from([(qcol("t.pk1"), HashSet::from([qcol("t.pk1")]))])
733        );
734
735        // Test aliased_columns_for from scan to first aggregate
736        assert_eq!(
737            aliased_columns_for(
738                &HashSet::from([qcol("t.pk1")]),
739                &first_aggr,
740                Some(&scan_plan)
741            )
742            .unwrap(),
743            HashMap::from([(qcol("t.pk1"), HashSet::from([qcol("t.pk1")]))])
744        );
745
746        // Test aliased_columns_for from first aggregate to final plan
747        assert_eq!(
748            aliased_columns_for(&HashSet::from([qcol("t.pk1")]), &plan, Some(&first_aggr)).unwrap(),
749            HashMap::from([(qcol("t.pk1"), HashSet::from([qcol("t.pk1")]))])
750        );
751
752        // Test aliased_columns_for from leaf to final plan
753        assert_eq!(
754            aliased_columns_for(&HashSet::from([Column::from_name("pk1")]), &plan, None).unwrap(),
755            HashMap::from([(Column::from_name("pk1"), HashSet::from([qcol("t.pk1")]))])
756        );
757    }
758
759    #[test]
760    fn aggr_aggr_alias_projection() {
761        // use logging for better debugging
762        init_default_ut_logging();
763        let test_table = TestTable::table_with_name(0, "t".to_string());
764        let table_source = Arc::new(DefaultTableSource::new(Arc::new(
765            DfTableProviderAdapter::new(test_table),
766        )));
767        let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
768            .unwrap()
769            .aggregate(vec![col("pk1"), col("pk2")], vec![max(col("number"))])
770            .unwrap()
771            .aggregate(
772                vec![col("pk1"), col("pk2")],
773                vec![min(col("max(t.number)"))],
774            )
775            .unwrap()
776            .project(vec![
777                col("pk1").alias("pk11"),
778                col("pk2").alias("pk22"),
779                col("min(max(t.number))").alias("min_max_number"),
780            ])
781            .unwrap()
782            .build()
783            .unwrap();
784
785        let proj_plan = plan.clone();
786        let second_aggr = proj_plan.inputs()[0].clone();
787
788        // Test original_column_for from projection to second aggregate for aggr gen column
789        assert_eq!(
790            original_column_for(
791                &HashSet::from([Column::from_name("min_max_number")]),
792                plan.clone(),
793                Some(Arc::new(second_aggr.clone()))
794            )
795            .unwrap(),
796            HashMap::from([(
797                Column::from_name("min_max_number"),
798                Column::from_name("min(max(t.number))")
799            )])
800        );
801
802        // Test aliased_columns_for from second aggregate to projection
803        assert_eq!(
804            aliased_columns_for(
805                &HashSet::from([Column::from_name("min(max(t.number))")]),
806                &plan,
807                Some(&second_aggr)
808            )
809            .unwrap(),
810            HashMap::from([(
811                Column::from_name("min(max(t.number))"),
812                HashSet::from([Column::from_name("min_max_number")])
813            )])
814        );
815    }
816}