1use std::collections::{BTreeMap, BTreeSet, HashSet};
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use datafusion::config::{ConfigExtension, ExtensionOptions};
20use datafusion::datasource::DefaultTableSource;
21use datafusion::error::Result as DfResult;
22use datafusion_common::Column;
23use datafusion_common::config::ConfigOptions;
24use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
25use datafusion_expr::expr::{Exists, InSubquery};
26use datafusion_expr::utils::expr_to_columns;
27use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, Subquery, col as col_fn};
28use datafusion_optimizer::analyzer::AnalyzerRule;
29use promql::extension_plan::SeriesDivide;
30use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
31use table::metadata::TableType;
32use table::table::adapter::DfTableProviderAdapter;
33
34use crate::dist_plan::analyzer::utils::{
35 PatchOptimizerContext, PlanTreeExpressionSimplifier, aliased_columns_for,
36 rewrite_merge_sort_exprs,
37};
38use crate::dist_plan::commutativity::{
39 Categorizer, Commutativity, partial_commutative_transformer,
40};
41use crate::dist_plan::merge_scan::MergeScanLogicalPlan;
42use crate::dist_plan::merge_sort::MergeSortLogicalPlan;
43use crate::metrics::PUSH_DOWN_FALLBACK_ERRORS_TOTAL;
44use crate::plan::ExtractExpr;
45use crate::query_engine::DefaultSerializer;
46
47#[cfg(test)]
48mod test;
49
50mod fallback;
51pub(crate) mod utils;
52
53pub(crate) use utils::AliasMapping;
54
55const OTHER_PHY_PART_COL_PLACEHOLDER: &str = "__OTHER_PHYSICAL_PART_COLS_PLACEHOLDER__";
57
58#[derive(Debug, Clone)]
59pub struct DistPlannerOptions {
60 pub allow_query_fallback: bool,
61}
62
63impl ConfigExtension for DistPlannerOptions {
64 const PREFIX: &'static str = "dist_planner";
65}
66
67impl ExtensionOptions for DistPlannerOptions {
68 fn as_any(&self) -> &dyn std::any::Any {
69 self
70 }
71
72 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
73 self
74 }
75
76 fn cloned(&self) -> Box<dyn ExtensionOptions> {
77 Box::new(self.clone())
78 }
79
80 fn set(&mut self, key: &str, value: &str) -> DfResult<()> {
81 Err(datafusion_common::DataFusionError::NotImplemented(format!(
82 "DistPlannerOptions does not support set key: {key} with value: {value}"
83 )))
84 }
85
86 fn entries(&self) -> Vec<datafusion::config::ConfigEntry> {
87 vec![datafusion::config::ConfigEntry {
88 key: "allow_query_fallback".to_string(),
89 value: Some(self.allow_query_fallback.to_string()),
90 description: "Allow query fallback to fallback plan rewriter",
91 }]
92 }
93}
94
95#[derive(Debug)]
96pub struct DistPlannerAnalyzer;
97
98impl AnalyzerRule for DistPlannerAnalyzer {
99 fn name(&self) -> &str {
100 "DistPlannerAnalyzer"
101 }
102
103 fn analyze(
104 &self,
105 plan: LogicalPlan,
106 config: &ConfigOptions,
107 ) -> datafusion_common::Result<LogicalPlan> {
108 let mut config = config.clone();
109 config.optimizer.filter_null_join_keys = true;
111 let config = Arc::new(config);
112
113 let optimizer_context = PatchOptimizerContext {
114 inner: datafusion_optimizer::OptimizerContext::new(),
115 config: config.clone(),
116 };
117
118 let plan = plan
119 .rewrite_with_subqueries(&mut PlanTreeExpressionSimplifier::new(optimizer_context))?
120 .data;
121
122 let opt = config.extensions.get::<DistPlannerOptions>();
123 let allow_fallback = opt.map(|o| o.allow_query_fallback).unwrap_or(false);
124
125 let result = match self.try_push_down(plan.clone()) {
126 Ok(plan) => plan,
127 Err(err) => {
128 if allow_fallback {
129 common_telemetry::warn!(err; "Failed to push down plan, using fallback plan rewriter for plan: {plan}");
130 PUSH_DOWN_FALLBACK_ERRORS_TOTAL.inc();
132 self.use_fallback(plan)?
133 } else {
134 return Err(err);
135 }
136 }
137 };
138
139 Ok(result)
140 }
141}
142
143impl DistPlannerAnalyzer {
144 fn try_push_down(&self, plan: LogicalPlan) -> DfResult<LogicalPlan> {
146 let plan = plan.transform(&Self::inspect_plan_with_subquery)?;
147 let mut rewriter = PlanRewriter::default();
148 let result = plan.data.rewrite(&mut rewriter)?.data;
149 Ok(result)
150 }
151
152 fn use_fallback(&self, plan: LogicalPlan) -> DfResult<LogicalPlan> {
154 let mut rewriter = fallback::FallbackPlanRewriter;
155 let result = plan.rewrite(&mut rewriter)?.data;
156 Ok(result)
157 }
158
159 fn inspect_plan_with_subquery(plan: LogicalPlan) -> DfResult<Transformed<LogicalPlan>> {
160 if let LogicalPlan::Limit(_) | LogicalPlan::Distinct(_) = &plan {
163 return Ok(Transformed::no(plan));
164 }
165
166 let exprs = plan
167 .expressions_consider_join()
168 .into_iter()
169 .map(|e| e.transform(&Self::transform_subquery).map(|x| x.data))
170 .collect::<DfResult<Vec<_>>>()?;
171
172 if !matches!(plan, LogicalPlan::Unnest(_)) {
174 let inputs = plan.inputs().into_iter().cloned().collect::<Vec<_>>();
175 Ok(Transformed::yes(plan.with_new_exprs(exprs, inputs)?))
176 } else {
177 Ok(Transformed::no(plan))
178 }
179 }
180
181 fn transform_subquery(expr: Expr) -> DfResult<Transformed<Expr>> {
182 match expr {
183 Expr::Exists(exists) => Ok(Transformed::yes(Expr::Exists(Exists {
184 subquery: Self::handle_subquery(exists.subquery)?,
185 negated: exists.negated,
186 }))),
187 Expr::InSubquery(in_subquery) => Ok(Transformed::yes(Expr::InSubquery(InSubquery {
188 expr: in_subquery.expr,
189 subquery: Self::handle_subquery(in_subquery.subquery)?,
190 negated: in_subquery.negated,
191 }))),
192 Expr::ScalarSubquery(scalar_subquery) => Ok(Transformed::yes(Expr::ScalarSubquery(
193 Self::handle_subquery(scalar_subquery)?,
194 ))),
195
196 _ => Ok(Transformed::no(expr)),
197 }
198 }
199
200 fn handle_subquery(subquery: Subquery) -> DfResult<Subquery> {
201 let mut rewriter = PlanRewriter::default();
202 let mut rewrote_subquery = subquery
203 .subquery
204 .as_ref()
205 .clone()
206 .rewrite(&mut rewriter)?
207 .data;
208 if matches!(rewrote_subquery, LogicalPlan::Extension(_)) {
210 let output_schema = rewrote_subquery.schema().clone();
211 let project_exprs = output_schema
212 .fields()
213 .iter()
214 .map(|f| col_fn(f.name()))
215 .collect::<Vec<_>>();
216 rewrote_subquery = LogicalPlanBuilder::from(rewrote_subquery)
217 .project(project_exprs)?
218 .build()?;
219 }
220
221 Ok(Subquery {
222 subquery: Arc::new(rewrote_subquery),
223 outer_ref_columns: subquery.outer_ref_columns,
224 spans: Default::default(),
225 })
226 }
227}
228
229#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
231enum RewriterStatus {
232 #[default]
233 Unexpanded,
234 Expanded,
235}
236
237#[derive(Debug, Default)]
238struct PlanRewriter {
239 level: usize,
241 stack: Vec<(LogicalPlan, usize)>,
243 stage: Vec<LogicalPlan>,
245 status: RewriterStatus,
246 partition_cols: Option<AliasMapping>,
248 column_requirements: Vec<(HashSet<Column>, usize)>,
274 expand_on_next_call: bool,
279 expand_on_next_part_cond_trans_commutative: bool,
291 new_child_plan: Option<LogicalPlan>,
292}
293
294impl PlanRewriter {
295 fn get_parent(&self) -> Option<&LogicalPlan> {
296 self.stack
298 .iter()
299 .rev()
300 .find(|(_, level)| *level == self.level - 1)
301 .map(|(node, _)| node)
302 }
303
304 fn should_expand(&mut self, plan: &LogicalPlan) -> DfResult<bool> {
306 debug!(
307 "Check should_expand at level: {} with Stack:\n{}, ",
308 self.level,
309 self.stack
310 .iter()
311 .map(|(p, l)| format!("{l}:{}{}", " ".repeat(l - 1), p.display()))
312 .collect::<Vec<String>>()
313 .join("\n"),
314 );
315 if let Err(e) = DFLogicalSubstraitConvertor.encode(plan, DefaultSerializer) {
316 debug!(
317 "PlanRewriter: plan cannot be converted to substrait with error={e:?}, expanding now: {plan}"
318 );
319 return Ok(true);
320 }
321
322 if self.expand_on_next_call {
323 self.expand_on_next_call = false;
324 debug!("PlanRewriter: expand_on_next_call is true, expanding now");
325 return Ok(true);
326 }
327
328 if self.expand_on_next_part_cond_trans_commutative {
329 let comm = Categorizer::check_plan(plan, self.partition_cols.clone())?;
330 match comm {
331 Commutativity::PartialCommutative => {
332 self.expand_on_next_part_cond_trans_commutative = false;
338 self.expand_on_next_call = true;
339 }
340 Commutativity::ConditionalCommutative(_)
341 | Commutativity::TransformedCommutative { .. } => {
342 self.expand_on_next_part_cond_trans_commutative = false;
345 debug!(
346 "PlanRewriter: meet a new conditional/transformed commutative plan, expanding now: {plan}"
347 );
348 return Ok(true);
349 }
350 _ => (),
351 }
352 }
353
354 match Categorizer::check_plan(plan, self.partition_cols.clone())? {
355 Commutativity::Commutative => {
356 if let LogicalPlan::Extension(ext_a) = plan
361 && ext_a.node.name() == SeriesDivide::name()
362 && let Some(LogicalPlan::Extension(ext_b)) = self.stage.last()
363 && ext_b.node.name() == MergeSortLogicalPlan::name()
364 {
365 self.stage.pop();
369 self.expand_on_next_part_cond_trans_commutative = false;
370 self.column_requirements.clear();
371 }
372 }
373 Commutativity::PartialCommutative => {
374 if let Some(plan) = partial_commutative_transformer(plan) {
375 self.update_column_requirements(&plan, self.level - 1);
377 self.expand_on_next_part_cond_trans_commutative = true;
378 self.stage.push(plan)
379 }
380 }
381 Commutativity::ConditionalCommutative(transformer) => {
382 if let Some(transformer) = transformer
383 && let Some(plan) = transformer(plan)
384 {
385 self.update_column_requirements(&plan, self.level - 1);
387 self.expand_on_next_part_cond_trans_commutative = true;
388 self.stage.push(plan)
389 }
390 }
391 Commutativity::TransformedCommutative { transformer } => {
392 if let Some(transformer) = transformer {
393 let transformer_actions = transformer(plan)?;
394 debug!(
395 "PlanRewriter: transformed plan: {}\n from {plan}",
396 transformer_actions
397 .extra_parent_plans
398 .iter()
399 .enumerate()
400 .map(|(i, p)| format!(
401 "Extra {i}-th parent plan from parent to child = {}",
402 p.display()
403 ))
404 .collect::<Vec<_>>()
405 .join("\n")
406 );
407 if let Some(new_child_plan) = &transformer_actions.new_child_plan {
408 debug!("PlanRewriter: new child plan: {}", new_child_plan);
409 }
410 if let Some(last_stage) = transformer_actions.extra_parent_plans.last() {
411 self.update_column_requirements(last_stage, self.level - 1);
414 }
415 self.stage
416 .extend(transformer_actions.extra_parent_plans.into_iter().rev());
417 self.expand_on_next_call = true;
418 self.new_child_plan = transformer_actions.new_child_plan;
419 }
420 }
421 Commutativity::NonCommutative
422 | Commutativity::Unimplemented
423 | Commutativity::Unsupported => {
424 debug!("PlanRewriter: meet a non-commutative plan, expanding now: {plan}");
425 return Ok(true);
426 }
427 }
428
429 Ok(false)
430 }
431
432 fn update_column_requirements(&mut self, plan: &LogicalPlan, plan_level: usize) {
436 debug!(
437 "PlanRewriter: update column requirements for plan: {plan}\n with old column_requirements: {:?}",
438 self.column_requirements
439 );
440 let mut container = HashSet::new();
441 for expr in plan.expressions() {
442 let _ = expr_to_columns(&expr, &mut container);
444 }
445
446 self.column_requirements.push((container, plan_level));
447 debug!(
448 "PlanRewriter: updated column requirements: {:?}",
449 self.column_requirements
450 );
451 }
452
453 fn is_expanded(&self) -> bool {
454 self.status == RewriterStatus::Expanded
455 }
456
457 fn set_expanded(&mut self) {
458 self.status = RewriterStatus::Expanded;
459 }
460
461 fn set_unexpanded(&mut self) {
462 self.status = RewriterStatus::Unexpanded;
463 }
464
465 fn maybe_set_partitions(&mut self, plan: &LogicalPlan) -> DfResult<()> {
466 if let Some(part_cols) = &mut self.partition_cols {
467 let child = plan.inputs().first().cloned().ok_or_else(|| {
469 datafusion_common::DataFusionError::Internal(format!(
470 "PlanRewriter: maybe_set_partitions: plan has no child: {plan}"
471 ))
472 })?;
473
474 for (_col_name, alias_set) in part_cols.iter_mut() {
475 let aliased_cols = aliased_columns_for(
476 &alias_set.clone().into_iter().collect(),
477 plan,
478 Some(child),
479 )?;
480 *alias_set = aliased_cols.into_values().flatten().collect();
481 }
482
483 debug!(
484 "PlanRewriter: maybe_set_partitions: updated partition columns: {:?} at plan: {}",
485 part_cols,
486 plan.display()
487 );
488
489 return Ok(());
490 }
491
492 if let LogicalPlan::TableScan(table_scan) = plan
493 && let Some(source) = table_scan
494 .source
495 .as_any()
496 .downcast_ref::<DefaultTableSource>()
497 && let Some(provider) = source
498 .table_provider
499 .as_any()
500 .downcast_ref::<DfTableProviderAdapter>()
501 {
502 let table = provider.table();
503 if table.table_type() == TableType::Base {
504 let info = table.table_info();
505 let partition_key_indices = info.meta.partition_key_indices.clone();
506 let schema = info.meta.schema.clone();
507 let mut partition_cols = partition_key_indices
508 .into_iter()
509 .map(|index| schema.column_name_by_index(index).to_string())
510 .collect::<Vec<String>>();
511
512 let partition_rules = table.partition_rules();
513 let exist_phy_part_cols_not_in_logical_table = partition_rules
514 .map(|r| !r.extra_phy_cols_not_in_logical_table.is_empty())
515 .unwrap_or(false);
516
517 if exist_phy_part_cols_not_in_logical_table && partition_cols.is_empty() {
518 partition_cols.push(OTHER_PHY_PART_COL_PLACEHOLDER.to_string());
526 }
527 self.partition_cols = Some(
528 partition_cols
529 .into_iter()
530 .map(|c| {
531 if c == OTHER_PHY_PART_COL_PLACEHOLDER {
532 return Ok((c.clone(), BTreeSet::new()));
534 }
535 let index =
536 if let Some(c) = plan.schema().index_of_column_by_name(None, &c){
537 c
538 } else {
539 return Ok((c.clone(), BTreeSet::new()))
542 };
543 let column = plan.schema().columns().get(index).cloned().ok_or_else(|| {
544 datafusion_common::DataFusionError::Internal(format!(
545 "PlanRewriter: maybe_set_partitions: column index {index} out of bounds in schema of plan: {plan}"
546 ))
547 })?;
548 Ok((c.clone(), BTreeSet::from([column])))
549 })
550 .collect::<DfResult<AliasMapping>>()?,
551 );
552 }
553 }
554
555 Ok(())
556 }
557
558 fn pop_stack(&mut self) {
560 self.level -= 1;
561 self.stack.pop();
562 }
563
564 fn expand(&mut self, mut on_node: LogicalPlan) -> DfResult<LogicalPlan> {
565 let schema = on_node.schema().clone();
567 if let Some(new_child_plan) = self.new_child_plan.take() {
568 on_node = new_child_plan;
570 }
571 let mut rewriter = EnforceDistRequirementRewriter::new(
572 std::mem::take(&mut self.column_requirements),
573 self.level,
574 );
575 debug!(
576 "PlanRewriter: enforce column requirements for node: {on_node} with rewriter: {rewriter:?}"
577 );
578 on_node = on_node.rewrite(&mut rewriter)?.data;
579 debug!(
580 "PlanRewriter: after enforced column requirements with rewriter: {rewriter:?} for node:\n{on_node}"
581 );
582
583 debug!(
584 "PlanRewriter: expand on node: {on_node} with partition col alias mapping: {:?}",
585 self.partition_cols
586 );
587
588 let mut node = MergeScanLogicalPlan::new(
590 on_node.clone(),
591 false,
592 self.partition_cols.clone().unwrap_or_default(),
595 )
596 .into_logical_plan();
597
598 for new_stage in self.stage.drain(..) {
600 let new_stage = if let LogicalPlan::Extension(ext) = &new_stage
602 && let Some(merge_sort) = ext.node.as_any().downcast_ref::<MergeSortLogicalPlan>()
603 {
604 rewrite_merge_sort_exprs(merge_sort, &on_node)?
606 } else {
607 new_stage
608 };
609 node = new_stage
610 .with_new_exprs(new_stage.expressions_consider_join(), vec![node.clone()])?;
611 }
612 self.set_expanded();
613
614 let node = LogicalPlanBuilder::from(node)
617 .project(schema.iter().map(|(qualifier, field)| {
618 Expr::Column(Column::new(qualifier.cloned(), field.name()))
619 }))?
620 .build()?;
621
622 Ok(node)
623 }
624}
625
626#[derive(Debug)]
634struct EnforceDistRequirementRewriter {
635 column_requirements: Vec<(HashSet<Column>, usize)>,
639 cur_level: usize,
650 plan_per_level: BTreeMap<usize, LogicalPlan>,
651}
652
653impl EnforceDistRequirementRewriter {
654 fn new(column_requirements: Vec<(HashSet<Column>, usize)>, cur_level: usize) -> Self {
655 debug!(
656 "Create EnforceDistRequirementRewriter with column_requirements: {:?} at cur_level: {}",
657 column_requirements, cur_level
658 );
659 Self {
660 column_requirements,
661 cur_level,
662 plan_per_level: BTreeMap::new(),
663 }
664 }
665
666 fn get_current_applicable_column_requirements(
670 &self,
671 node: &LogicalPlan,
672 ) -> DfResult<BTreeMap<(Column, usize), BTreeSet<Column>>> {
673 let col_req_per_level = self
674 .column_requirements
675 .iter()
676 .filter(|(_, level)| *level >= self.cur_level)
677 .collect::<Vec<_>>();
678
679 let mut result_alias_mapping = BTreeMap::new();
682 let Some(child) = node.inputs().first().cloned() else {
683 return Ok(Default::default());
684 };
685 for (col_req, level) in col_req_per_level {
686 if let Some(original) = self.plan_per_level.get(level) {
687 let aliased_cols =
689 aliased_columns_for(&col_req.iter().cloned().collect(), node, Some(original))?;
690 for original_col in col_req {
691 let aliased_cols = aliased_cols.get(original_col).cloned();
692 if let Some(cols) = aliased_cols
693 && !cols.is_empty()
694 {
695 result_alias_mapping.insert((original_col.clone(), *level), cols);
696 } else {
697 let aliases_in_child = aliased_columns_for(
702 &[original_col.clone()].into(),
703 child,
704 Some(original),
705 )?;
706 let Some(aliases) = aliases_in_child
707 .get(original_col)
708 .cloned()
709 .filter(|a| !a.is_empty())
710 else {
711 return Err(datafusion_common::DataFusionError::Internal(format!(
712 "EnforceDistRequirementRewriter: no alias found for required column {original_col} at level {level} in current node's child plan: \n{child} from original plan: \n{original}",
713 )));
714 };
715
716 result_alias_mapping.insert((original_col.clone(), *level), aliases);
717 }
718 }
719 }
720 }
721 Ok(result_alias_mapping)
722 }
723}
724
725impl TreeNodeRewriter for EnforceDistRequirementRewriter {
726 type Node = LogicalPlan;
727
728 fn f_down(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
729 if node.inputs().len() > 1 {
731 return Err(datafusion_common::DataFusionError::Internal(
732 "EnforceDistRequirementRewriter: node with multiple inputs is not supported"
733 .to_string(),
734 ));
735 }
736 self.plan_per_level.insert(self.cur_level, node.clone());
737 self.cur_level += 1;
738 Ok(Transformed::no(node))
739 }
740
741 fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
742 self.cur_level -= 1;
743 if let LogicalPlan::Projection(ref projection) = node {
747 let mut applicable_column_requirements =
748 self.get_current_applicable_column_requirements(&node)?;
749
750 debug!(
751 "EnforceDistRequirementRewriter: applicable column requirements at level {} = {:?} for node {}",
752 self.cur_level,
753 applicable_column_requirements,
754 node.display()
755 );
756
757 for expr in &projection.expr {
758 let (qualifier, name) = expr.qualified_name();
759 let column = Column::new(qualifier, name);
760 applicable_column_requirements.retain(|_col_level, alias_set| {
761 !alias_set.contains(&column)
763 });
764 }
765 if applicable_column_requirements.is_empty() {
766 return Ok(Transformed::no(node));
767 }
768
769 let mut new_exprs = projection.expr.clone();
770 for (col, alias_set) in &applicable_column_requirements {
771 new_exprs.push(Expr::Column(alias_set.first().cloned().ok_or_else(
773 || {
774 datafusion_common::DataFusionError::Internal(
775 format!("EnforceDistRequirementRewriter: alias set is empty, for column {col:?} in node {node}"),
776 )
777 },
778 )?));
779 }
780 let new_node =
781 node.with_new_exprs(new_exprs, node.inputs().into_iter().cloned().collect())?;
782 debug!(
783 "EnforceDistRequirementRewriter: added missing columns {:?} to projection node from old node: \n{node}\n Making new node: \n{new_node}",
784 applicable_column_requirements
785 );
786
787 self.plan_per_level.insert(self.cur_level, new_node.clone());
789
790 return Ok(Transformed::yes(new_node));
792 }
793 Ok(Transformed::no(node))
794 }
795}
796
797impl TreeNodeRewriter for PlanRewriter {
798 type Node = LogicalPlan;
799
800 fn f_down<'a>(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
802 self.level += 1;
803 self.stack.push((node.clone(), self.level));
804 self.stage.clear();
806 self.set_unexpanded();
807 self.partition_cols = None;
808 Ok(Transformed::no(node))
809 }
810
811 fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
815 if self.is_expanded() {
817 self.pop_stack();
818 return Ok(Transformed::no(node));
819 }
820
821 if node.inputs().is_empty() && !matches!(node, LogicalPlan::TableScan(_)) {
823 self.set_expanded();
824 self.pop_stack();
825 return Ok(Transformed::no(node));
826 }
827
828 self.maybe_set_partitions(&node)?;
829
830 let Some(parent) = self.get_parent() else {
831 debug!("Plan Rewriter: expand now for no parent found for node: {node}");
832 let node = self.expand(node);
833 debug!(
834 "PlanRewriter: expanded plan: {}",
835 match &node {
836 Ok(n) => n.to_string(),
837 Err(e) => format!("Error expanding plan: {e}"),
838 }
839 );
840 let node = node?;
841 self.pop_stack();
842 return Ok(Transformed::yes(node));
843 };
844
845 let parent = parent.clone();
846
847 if self.should_expand(&parent)? {
848 debug!(
850 "PlanRewriter: should expand child:\n {node}\n Of Parent: {}",
851 parent.display()
852 );
853 let node = self.expand(node);
854 debug!(
855 "PlanRewriter: expanded plan: {}",
856 match &node {
857 Ok(n) => n.to_string(),
858 Err(e) => format!("Error expanding plan: {e}"),
859 }
860 );
861 let node = node?;
862 self.pop_stack();
863 return Ok(Transformed::yes(node));
864 }
865
866 self.pop_stack();
867 Ok(Transformed::no(node))
868 }
869}