1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17
18use datafusion::error::Result as DfResult;
19use datafusion_common::Column;
20use datafusion_common::tree_node::{Transformed, TreeNode as _};
21use datafusion_expr::expr::Alias;
22use datafusion_expr::{Expr, Extension, LogicalPlan};
23
24use crate::dist_plan::merge_sort::MergeSortLogicalPlan;
25use crate::plan::ExtractExpr as _;
26
27fn rewrite_column(
28 mapping: &BTreeMap<Column, BTreeSet<Column>>,
29 original_node: &LogicalPlan,
30 alias_node: &LogicalPlan,
31) -> impl Fn(Expr) -> DfResult<Transformed<Expr>> {
32 move |e: Expr| {
33 if let Expr::Column(col) = e {
34 if let Some(aliased_cols) = mapping.get(&col) {
35 if let Some(aliased_col) = aliased_cols.iter().next() {
37 Ok(Transformed::yes(Expr::Column(aliased_col.clone())))
38 } else {
39 Err(datafusion_common::DataFusionError::Internal(format!(
40 "PlanRewriter: expand: column {col} from {original_node}\n has empty alias set in plan: {alias_node}\n but expect at least one alias",
41 )))
42 }
43 } else {
44 Err(datafusion_common::DataFusionError::Internal(format!(
45 "PlanRewriter: expand: column {col} from {original_node}\n has no alias in plan: {alias_node}",
46 )))
47 }
48 } else {
49 Ok(Transformed::no(e))
50 }
51 }
52}
53
54pub fn rewrite_merge_sort_exprs(
56 merge_sort: &MergeSortLogicalPlan,
57 aliased_node: &LogicalPlan,
58) -> DfResult<LogicalPlan> {
59 let merge_sort = LogicalPlan::Extension(Extension {
60 node: Arc::new(merge_sort.clone()),
61 });
62
63 let sort_input = merge_sort.inputs().first().cloned().ok_or_else(|| {
65 datafusion_common::DataFusionError::Internal(format!(
66 "PlanRewriter: expand: merge sort stage has no input: {merge_sort}"
67 ))
68 })?;
69 let sort_exprs = merge_sort.expressions_consider_join();
70 let column_refs = sort_exprs
71 .iter()
72 .flat_map(|e| e.column_refs().into_iter().cloned())
73 .collect::<BTreeSet<_>>();
74 let column_alias_mapping = aliased_columns_for(&column_refs, aliased_node, Some(sort_input))?;
75 let aliased_sort_exprs = sort_exprs
76 .into_iter()
77 .map(|e| {
78 e.transform(rewrite_column(
79 &column_alias_mapping,
80 &merge_sort,
81 aliased_node,
82 ))
83 })
84 .map(|e| e.map(|e| e.data))
85 .collect::<DfResult<Vec<_>>>()?;
86 let new_merge_sort = merge_sort.with_new_exprs(
87 aliased_sort_exprs,
88 merge_sort.inputs().into_iter().cloned().collect(),
89 )?;
90 Ok(new_merge_sort)
91}
92
93#[allow(unused)]
99pub fn original_column_for(
100 aliased_columns: &BTreeSet<Column>,
101 aliased_node: LogicalPlan,
102 original_node: Option<Arc<LogicalPlan>>,
103) -> DfResult<BTreeMap<Column, Column>> {
104 let schema_cols: BTreeSet<Column> = aliased_node.schema().columns().iter().cloned().collect();
105 let cur_aliases: BTreeMap<Column, Column> = aliased_columns
106 .iter()
107 .filter(|c| schema_cols.contains(c))
108 .map(|c| (c.clone(), c.clone()))
109 .collect();
110
111 if cur_aliases.is_empty() {
112 return Ok(BTreeMap::new());
113 }
114
115 original_column_for_inner(cur_aliases, &aliased_node, &original_node)
116}
117
118fn original_column_for_inner(
119 mut cur_aliases: BTreeMap<Column, Column>,
120 node: &LogicalPlan,
121 original_node: &Option<Arc<LogicalPlan>>,
122) -> DfResult<BTreeMap<Column, Column>> {
123 let mut current_node = node;
124
125 loop {
126 if let Some(original_node) = original_node
128 && *current_node == **original_node
129 {
130 return Ok(cur_aliases);
131 } else if current_node.inputs().is_empty() {
132 return Ok(cur_aliases);
134 }
135
136 if current_node.inputs().len() != 1 {
138 return Err(datafusion::error::DataFusionError::Internal(format!(
139 "only accept plan with at most one child, found: {}",
140 current_node
141 )));
142 }
143
144 let layer = get_alias_layer_from_node(current_node)?;
146 let mut new_aliases = BTreeMap::new();
147 for (start_alias, cur_alias) in cur_aliases {
148 if let Some(old_column) = layer.get_old_from_new(cur_alias.clone()) {
149 new_aliases.insert(start_alias, old_column);
150 }
151 }
152
153 cur_aliases = new_aliases;
155 current_node = current_node.inputs()[0];
156 }
157}
158
159pub fn aliased_columns_for(
165 original_columns: &BTreeSet<Column>,
166 aliased_node: &LogicalPlan,
167 original_node: Option<&LogicalPlan>,
168) -> DfResult<BTreeMap<Column, BTreeSet<Column>>> {
169 let initial_aliases: BTreeMap<Column, BTreeSet<Column>> = {
170 if let Some(original) = &original_node {
171 let schema_cols: BTreeSet<Column> = original.schema().columns().into_iter().collect();
172 original_columns
173 .iter()
174 .filter(|c| schema_cols.contains(c))
175 .map(|c| (c.clone(), [c.clone()].into()))
176 .collect()
177 } else {
178 original_columns
179 .iter()
180 .map(|c| (c.clone(), [c.clone()].into()))
181 .collect()
182 }
183 };
184
185 if initial_aliases.is_empty() {
186 return Ok(BTreeMap::new());
187 }
188
189 aliased_columns_for_inner(initial_aliases, aliased_node, original_node)
190}
191
192fn aliased_columns_for_inner(
193 cur_aliases: BTreeMap<Column, BTreeSet<Column>>,
194 node: &LogicalPlan,
195 original_node: Option<&LogicalPlan>,
196) -> DfResult<BTreeMap<Column, BTreeSet<Column>>> {
197 let mut path = Vec::new();
199 let mut current_node = node;
200
201 loop {
203 if let Some(original_node) = original_node
205 && *current_node == *original_node
206 {
207 break;
208 } else if current_node.inputs().is_empty() {
209 break;
211 }
212
213 if current_node.inputs().len() != 1 {
215 return Err(datafusion::error::DataFusionError::Internal(format!(
216 "only accept plan with at most one child, found: {}",
217 current_node
218 )));
219 }
220
221 path.push(current_node);
223 current_node = current_node.inputs()[0];
224 }
225
226 let mut result = cur_aliases;
228 for &node_in_path in path.iter().rev() {
229 let layer = get_alias_layer_from_node(node_in_path)?;
230 let mut new_aliases = BTreeMap::new();
231 for (original_column, cur_alias_set) in result {
232 let mut new_alias_set = BTreeSet::new();
233 for cur_alias in cur_alias_set {
234 new_alias_set.extend(layer.get_new_from_old(cur_alias.clone()));
235 }
236 if !new_alias_set.is_empty() {
237 new_aliases.insert(original_column, new_alias_set);
238 }
239 }
240 result = new_aliases;
241 }
242
243 Ok(result)
244}
245
246fn get_alias_layer_from_node(node: &LogicalPlan) -> DfResult<AliasLayer> {
249 match node {
250 LogicalPlan::Projection(proj) => Ok(get_alias_layer_from_exprs(&proj.expr)),
251 LogicalPlan::Aggregate(aggr) => Ok(get_alias_layer_from_exprs(&aggr.group_expr)),
252 LogicalPlan::SubqueryAlias(subquery_alias) => {
253 let mut layer = AliasLayer::default();
254 let old_columns = subquery_alias.input.schema().columns();
255 for old_column in old_columns {
256 let new_column = Column::new(
257 Some(subquery_alias.alias.clone()),
258 old_column.name().to_string(),
259 );
260 layer.insert_alias(old_column, [new_column].into());
262 }
263 Ok(layer)
264 }
265 LogicalPlan::TableScan(scan) => {
266 let columns = scan.projected_schema.columns();
267 let mut layer = AliasLayer::default();
268 for col in columns {
269 layer.insert_alias(col.clone(), [col.clone()].into());
270 }
271 Ok(layer)
272 }
273 _ => {
274 let input_schema = node
275 .inputs()
276 .first()
277 .ok_or_else(|| {
278 datafusion::error::DataFusionError::Internal(format!(
279 "only accept plan with at most one child, found: {}",
280 node
281 ))
282 })?
283 .schema();
284 let output_schema = node.schema();
285 if node.inputs().len() > 1 {
288 Err(datafusion::error::DataFusionError::Internal(format!(
289 "only accept plan with at most one child, found: {}",
290 node
291 )))
292 } else if node.inputs().len() == 1 {
293 if input_schema != output_schema {
294 let input_columns = input_schema.columns();
295 let all_input_is_in_output = input_columns
296 .iter()
297 .all(|c| output_schema.is_column_from_schema(c));
298 if all_input_is_in_output {
299 let mut layer = AliasLayer::default();
301 for col in input_columns {
302 layer.insert_alias(col.clone(), [col.clone()].into());
303 }
304 Ok(layer)
305 } else {
306 common_telemetry::debug!(
309 "Might be unsupported plan for alias tracking, track alias anyway: {}",
310 node
311 );
312 let input_columns = input_schema.columns();
313 let output_columns =
314 output_schema.columns().into_iter().collect::<HashSet<_>>();
315 let common_columns: HashSet<Column> = input_columns
316 .iter()
317 .filter(|c| output_columns.contains(c))
318 .cloned()
319 .collect();
320
321 let mut layer = AliasLayer::default();
322 for col in &common_columns {
323 layer.insert_alias(col.clone(), [col.clone()].into());
324 }
325 Ok(layer)
326 }
327 } else {
328 let mut layer = AliasLayer::default();
330 for col in output_schema.columns() {
331 layer.insert_alias(col.clone(), [col.clone()].into());
332 }
333 Ok(layer)
334 }
335 } else {
336 Err(datafusion::error::DataFusionError::Internal(format!(
338 "Unsupported plan with no input: {}",
339 node
340 )))
341 }
342 }
343 }
344}
345
346fn get_alias_layer_from_exprs(exprs: &[Expr]) -> AliasLayer {
347 let mut alias_mapping: HashMap<Column, HashSet<Column>> = HashMap::new();
348 for expr in exprs {
349 if let Expr::Alias(alias) = expr {
350 if let Some(column) = get_alias_original_column(alias) {
351 alias_mapping
352 .entry(column.clone())
353 .or_default()
354 .insert(Column::new(alias.relation.clone(), alias.name.clone()));
355 }
356 } else if let Expr::Column(column) = expr {
357 alias_mapping
359 .entry(column.clone())
360 .or_default()
361 .insert(column.clone());
362 }
363 }
364 let mut layer = AliasLayer::default();
365 for (old_column, new_columns) in alias_mapping {
366 layer.insert_alias(old_column, new_columns);
367 }
368 layer
369}
370
371#[derive(Default, Debug, Clone)]
372struct AliasLayer {
373 old_to_new: BTreeMap<Column, HashSet<Column>>,
375}
376
377impl AliasLayer {
378 pub fn insert_alias(&mut self, old_column: Column, new_columns: HashSet<Column>) {
379 self.old_to_new
380 .entry(old_column)
381 .or_default()
382 .extend(new_columns);
383 }
384
385 pub fn get_new_from_old(&self, old_column: Column) -> HashSet<Column> {
386 let mut res_cols = HashSet::new();
387 for (old, new_cols) in self.old_to_new.iter() {
388 if old.name() == old_column.name() {
389 match (&old.relation, &old_column.relation) {
390 (Some(o), Some(c)) => {
391 if o.resolved_eq(c) {
392 res_cols.extend(new_cols.clone());
393 }
394 }
395 _ => {
396 res_cols.extend(new_cols.clone());
398 }
399 }
400 }
401 }
402 res_cols
403 }
404
405 pub fn get_old_from_new(&self, new_column: Column) -> Option<Column> {
406 for (old, new_set) in &self.old_to_new {
407 if new_set.iter().any(|n| {
408 if n.name() != new_column.name() {
409 return false;
410 }
411 match (&n.relation, &new_column.relation) {
412 (Some(r1), Some(r2)) => r1.resolved_eq(r2),
413 _ => true,
414 }
415 }) {
416 return Some(old.clone());
417 }
418 }
419 None
420 }
421}
422
423fn get_alias_original_column(alias: &Alias) -> Option<Column> {
424 let mut cur_alias = alias;
425 while let Expr::Alias(inner_alias) = cur_alias.expr.as_ref() {
426 cur_alias = inner_alias;
427 }
428 if let Expr::Column(column) = cur_alias.expr.as_ref() {
429 return Some(column.clone());
430 }
431
432 None
433}
434
435pub type AliasMapping = BTreeMap<String, BTreeSet<Column>>;
437
438#[cfg(test)]
439mod tests {
440 use std::sync::Arc;
441
442 use common_telemetry::init_default_ut_logging;
443 use datafusion::datasource::DefaultTableSource;
444 use datafusion::functions_aggregate::min_max::{max, min};
445 use datafusion_expr::{LogicalPlanBuilder, col};
446 use pretty_assertions::assert_eq;
447 use table::table::adapter::DfTableProviderAdapter;
448
449 use super::*;
450 use crate::dist_plan::analyzer::test::TestTable;
451
452 fn qcol(name: &str) -> Column {
453 Column::from_qualified_name(name)
454 }
455
456 #[test]
457 fn proj_multi_layered_alias_tracker() {
458 init_default_ut_logging();
460 let test_table = TestTable::table_with_name(0, "t".to_string());
461 let table_source = Arc::new(DefaultTableSource::new(Arc::new(
462 DfTableProviderAdapter::new(test_table),
463 )));
464 let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
465 .unwrap()
466 .project(vec![
467 col("number"),
468 col("pk3").alias("pk1"),
469 col("pk3").alias("pk2"),
470 ])
471 .unwrap()
472 .project(vec![
473 col("number"),
474 col("pk2").alias("pk4"),
475 col("pk1").alias("pk5"),
476 ])
477 .unwrap()
478 .build()
479 .unwrap();
480
481 let child = plan.inputs()[0].clone();
482
483 assert_eq!(
484 aliased_columns_for(&[qcol("pk1"), qcol("pk2")].into(), &plan, Some(&child)).unwrap(),
485 [
486 (qcol("pk1"), [qcol("pk5")].into()),
487 (qcol("pk2"), [qcol("pk4")].into())
488 ]
489 .into()
490 );
491
492 assert_eq!(
494 aliased_columns_for(&[qcol("pk1"), qcol("pk2")].into(), &plan, Some(&plan)).unwrap(),
495 [].into()
496 );
497
498 assert_eq!(
499 aliased_columns_for(&[qcol("t.pk3")].into(), &plan, Some(&child)).unwrap(),
500 [].into()
501 );
502
503 assert_eq!(
504 original_column_for(&[qcol("pk5"), qcol("pk4")].into(), plan.clone(), None).unwrap(),
505 [(qcol("pk5"), qcol("t.pk3")), (qcol("pk4"), qcol("t.pk3"))].into()
506 );
507
508 assert_eq!(
509 aliased_columns_for(&[qcol("pk3")].into(), &plan, None).unwrap(),
510 [(qcol("pk3"), [qcol("pk5"), qcol("pk4")].into())].into()
511 );
512 assert_eq!(
513 original_column_for(&[qcol("pk1"), qcol("pk2")].into(), child.clone(), None).unwrap(),
514 [(qcol("pk1"), qcol("t.pk3")), (qcol("pk2"), qcol("t.pk3"))].into()
515 );
516
517 assert_eq!(
518 aliased_columns_for(&[qcol("pk3")].into(), &child, None).unwrap(),
519 [(qcol("pk3"), [qcol("pk1"), qcol("pk2")].into())].into()
520 );
521
522 assert_eq!(
523 original_column_for(
524 &[qcol("pk4"), qcol("pk5")].into(),
525 plan.clone(),
526 Some(Arc::new(child.clone()))
527 )
528 .unwrap(),
529 [(qcol("pk4"), qcol("pk2")), (qcol("pk5"), qcol("pk1"))].into()
530 );
531 }
532
533 #[test]
534 fn sort_subquery_alias_layered_tracker() {
535 let test_table = TestTable::table_with_name(0, "t".to_string());
536 let table_source = Arc::new(DefaultTableSource::new(Arc::new(
537 DfTableProviderAdapter::new(test_table),
538 )));
539
540 let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
541 .unwrap()
542 .sort(vec![col("t.number").sort(true, false)])
543 .unwrap()
544 .alias("a")
545 .unwrap()
546 .build()
547 .unwrap();
548
549 let sort_plan = plan.inputs()[0].clone();
550 let scan_plan = sort_plan.inputs()[0].clone();
551
552 assert_eq!(
554 aliased_columns_for(&[qcol("t.number")].into(), &plan, Some(&scan_plan)).unwrap(),
555 [(qcol("t.number"), [qcol("a.number")].into())].into()
556 );
557
558 assert_eq!(
560 aliased_columns_for(&[qcol("t.number")].into(), &plan, Some(&sort_plan)).unwrap(),
561 [(qcol("t.number"), [qcol("a.number")].into())].into()
562 );
563
564 assert_eq!(
566 aliased_columns_for(&[qcol("t.number")].into(), &plan, None).unwrap(),
567 [(qcol("t.number"), [qcol("a.number")].into())].into()
568 );
569
570 assert_eq!(
572 original_column_for(
573 &[qcol("a.number")].into(),
574 plan.clone(),
575 Some(Arc::new(scan_plan.clone()))
576 )
577 .unwrap(),
578 [(qcol("a.number"), qcol("t.number"))].into()
579 );
580
581 assert_eq!(
583 original_column_for(
584 &[qcol("a.number")].into(),
585 plan.clone(),
586 Some(Arc::new(sort_plan.clone()))
587 )
588 .unwrap(),
589 [(qcol("a.number"), qcol("t.number"))].into()
590 );
591 }
592
593 #[test]
594 fn proj_alias_layered_tracker() {
595 init_default_ut_logging();
597 let test_table = TestTable::table_with_name(0, "t".to_string());
598 let table_source = Arc::new(DefaultTableSource::new(Arc::new(
599 DfTableProviderAdapter::new(test_table),
600 )));
601 let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
602 .unwrap()
603 .project(vec![
604 col("number"),
605 col("pk3").alias("pk1"),
606 col("pk2").alias("pk3"),
607 ])
608 .unwrap()
609 .project(vec![
610 col("number"),
611 col("pk1").alias("pk2"),
612 col("pk3").alias("pk1"),
613 ])
614 .unwrap()
615 .build()
616 .unwrap();
617
618 let first_proj = plan.inputs()[0].clone();
619 let scan_plan = first_proj.inputs()[0].clone();
620
621 assert_eq!(
623 original_column_for(
624 &[qcol("pk1")].into(),
625 plan.clone(),
626 Some(Arc::new(scan_plan.clone()))
627 )
628 .unwrap(),
629 [(qcol("pk1"), qcol("t.pk2"))].into()
630 );
631
632 assert_eq!(
634 original_column_for(
635 &[qcol("pk1")].into(),
636 plan.clone(),
637 Some(Arc::new(first_proj.clone()))
638 )
639 .unwrap(),
640 [(qcol("pk1"), qcol("pk3"))].into()
641 );
642
643 assert_eq!(
645 original_column_for(
646 &[qcol("pk1")].into(),
647 plan.clone(),
648 Some(Arc::new(plan.clone()))
649 )
650 .unwrap(),
651 [(qcol("pk1"), qcol("pk1"))].into()
652 );
653
654 assert_eq!(
656 aliased_columns_for(&[qcol("t.pk2")].into(), &first_proj, Some(&scan_plan)).unwrap(),
657 [(qcol("t.pk2"), [qcol("pk3")].into())].into()
658 );
659
660 assert_eq!(
662 aliased_columns_for(&[qcol("pk3")].into(), &plan, Some(&first_proj)).unwrap(),
663 [(qcol("pk3"), [qcol("pk1")].into())].into()
664 );
665
666 assert_eq!(
668 aliased_columns_for(&[qcol("t.pk2")].into(), &plan, Some(&scan_plan)).unwrap(),
669 [(qcol("t.pk2"), [qcol("pk1")].into())].into()
670 );
671
672 assert_eq!(
674 aliased_columns_for(&[qcol("pk2")].into(), &plan, None).unwrap(),
675 [(qcol("pk2"), [qcol("pk1")].into())].into()
676 );
677 }
678
679 #[test]
680 fn proj_alias_relation_layered_tracker() {
681 init_default_ut_logging();
683 let test_table = TestTable::table_with_name(0, "t".to_string());
684 let table_source = Arc::new(DefaultTableSource::new(Arc::new(
685 DfTableProviderAdapter::new(test_table),
686 )));
687 let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
688 .unwrap()
689 .project(vec![
690 col("number"),
691 col("pk3").alias_qualified(Some("b"), "pk1"),
692 col("pk2").alias_qualified(Some("a"), "pk1"),
693 ])
694 .unwrap()
695 .build()
696 .unwrap();
697
698 let scan_plan = plan.inputs()[0].clone();
699
700 assert_eq!(
702 aliased_columns_for(&[qcol("t.pk2")].into(), &plan, Some(&scan_plan)).unwrap(),
703 [(qcol("t.pk2"), [qcol("a.pk1")].into())].into()
704 );
705 }
706
707 #[test]
708 fn proj_alias_aliased_aggr() {
709 init_default_ut_logging();
711 let test_table = TestTable::table_with_name(0, "t".to_string());
712 let table_source = Arc::new(DefaultTableSource::new(Arc::new(
713 DfTableProviderAdapter::new(test_table),
714 )));
715 let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
716 .unwrap()
717 .project(vec![
718 col("number"),
719 col("pk1").alias("pk3"),
720 col("pk2").alias("pk4"),
721 ])
722 .unwrap()
723 .project(vec![
724 col("number"),
725 col("pk3").alias("pk42"),
726 col("pk4").alias("pk43"),
727 ])
728 .unwrap()
729 .aggregate(vec![col("pk42"), col("pk43")], vec![min(col("number"))])
730 .unwrap()
731 .build()
732 .unwrap();
733
734 let aggr_plan = plan.clone();
735 let second_proj = aggr_plan.inputs()[0].clone();
736 let first_proj = second_proj.inputs()[0].clone();
737 let scan_plan = first_proj.inputs()[0].clone();
738
739 assert_eq!(
741 aliased_columns_for(&[qcol("t.pk1")].into(), &plan, Some(&scan_plan)).unwrap(),
742 [(qcol("t.pk1"), [qcol("pk42")].into())].into()
743 );
744
745 assert_eq!(
747 aliased_columns_for(&[Column::from_name("pk1")].into(), &first_proj, None).unwrap(),
748 [(Column::from_name("pk1"), [qcol("pk3")].into())].into()
749 );
750 }
751
752 #[test]
753 fn aggr_aggr_alias() {
754 init_default_ut_logging();
756 let test_table = TestTable::table_with_name(0, "t".to_string());
757 let table_source = Arc::new(DefaultTableSource::new(Arc::new(
758 DfTableProviderAdapter::new(test_table),
759 )));
760 let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
761 .unwrap()
762 .aggregate(vec![col("pk1"), col("pk2")], vec![max(col("number"))])
763 .unwrap()
764 .aggregate(
765 vec![col("pk1"), col("pk2")],
766 vec![min(col("max(t.number)"))],
767 )
768 .unwrap()
769 .build()
770 .unwrap();
771
772 let second_aggr = plan.clone();
773 let first_aggr = second_aggr.inputs()[0].clone();
774 let scan_plan = first_aggr.inputs()[0].clone();
775
776 assert_eq!(
778 aliased_columns_for(&[qcol("t.pk1")].into(), &plan, Some(&scan_plan)).unwrap(),
779 [(qcol("t.pk1"), [qcol("t.pk1")].into())].into()
780 );
781
782 assert_eq!(
784 aliased_columns_for(&[qcol("t.pk1")].into(), &first_aggr, Some(&scan_plan)).unwrap(),
785 [(qcol("t.pk1"), [qcol("t.pk1")].into())].into()
786 );
787
788 assert_eq!(
790 aliased_columns_for(&[qcol("t.pk1")].into(), &plan, Some(&first_aggr)).unwrap(),
791 [(qcol("t.pk1"), [qcol("t.pk1")].into())].into()
792 );
793
794 assert_eq!(
796 aliased_columns_for(&[Column::from_name("pk1")].into(), &plan, None).unwrap(),
797 [(Column::from_name("pk1"), [qcol("t.pk1")].into())].into()
798 );
799 }
800
801 #[test]
802 fn aggr_aggr_alias_projection() {
803 init_default_ut_logging();
805 let test_table = TestTable::table_with_name(0, "t".to_string());
806 let table_source = Arc::new(DefaultTableSource::new(Arc::new(
807 DfTableProviderAdapter::new(test_table),
808 )));
809 let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
810 .unwrap()
811 .aggregate(vec![col("pk1"), col("pk2")], vec![max(col("number"))])
812 .unwrap()
813 .aggregate(
814 vec![col("pk1"), col("pk2")],
815 vec![min(col("max(t.number)"))],
816 )
817 .unwrap()
818 .project(vec![
819 col("pk1").alias("pk11"),
820 col("pk2").alias("pk22"),
821 col("min(max(t.number))").alias("min_max_number"),
822 ])
823 .unwrap()
824 .build()
825 .unwrap();
826
827 let proj_plan = plan.clone();
828 let second_aggr = proj_plan.inputs()[0].clone();
829
830 assert_eq!(
832 original_column_for(
833 &[Column::from_name("min_max_number")].into(),
834 plan.clone(),
835 Some(Arc::new(second_aggr.clone()))
836 )
837 .unwrap(),
838 [(
839 Column::from_name("min_max_number"),
840 Column::from_name("min(max(t.number))")
841 )]
842 .into()
843 );
844
845 assert_eq!(
847 aliased_columns_for(
848 &[Column::from_name("min(max(t.number))")].into(),
849 &plan,
850 Some(&second_aggr)
851 )
852 .unwrap(),
853 [(
854 Column::from_name("min(max(t.number))"),
855 [Column::from_name("min_max_number")].into()
856 )]
857 .into()
858 );
859 }
860}