query/dist_plan/
merge_sort.rs1use std::fmt;
20use std::sync::Arc;
21
22use datafusion_common::{DataFusionError, Result};
23use datafusion_expr::{Extension, LogicalPlan, SortExpr, UserDefinedLogicalNodeCore};
24
25#[derive(Hash, PartialOrd, PartialEq, Eq, Clone)]
29pub struct MergeSortLogicalPlan {
30    pub expr: Vec<SortExpr>,
31    pub input: Arc<LogicalPlan>,
32    pub fetch: Option<usize>,
33}
34
35impl fmt::Debug for MergeSortLogicalPlan {
36    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
37        UserDefinedLogicalNodeCore::fmt_for_explain(self, f)
38    }
39}
40
41impl MergeSortLogicalPlan {
42    pub fn new(input: Arc<LogicalPlan>, expr: Vec<SortExpr>, fetch: Option<usize>) -> Self {
43        Self { input, expr, fetch }
44    }
45
46    pub fn name() -> &'static str {
47        "MergeSort"
48    }
49
50    pub fn into_logical_plan(self) -> LogicalPlan {
52        LogicalPlan::Extension(Extension {
53            node: Arc::new(self),
54        })
55    }
56
57    pub fn into_sort(self) -> LogicalPlan {
59        LogicalPlan::Sort(datafusion::logical_expr::Sort {
60            input: self.input.clone(),
61            expr: self.expr,
62            fetch: self.fetch,
63        })
64    }
65}
66
67impl UserDefinedLogicalNodeCore for MergeSortLogicalPlan {
68    fn name(&self) -> &str {
69        Self::name()
70    }
71
72    fn inputs(&self) -> Vec<&LogicalPlan> {
74        vec![self.input.as_ref()]
75    }
76
77    fn schema(&self) -> &datafusion_common::DFSchemaRef {
78        self.input.schema()
79    }
80
81    fn expressions(&self) -> Vec<datafusion_expr::Expr> {
83        self.expr.iter().map(|sort| sort.expr.clone()).collect()
84    }
85
86    fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
87        write!(f, "MergeSort: ")?;
88        for (i, expr_item) in self.expr.iter().enumerate() {
89            if i > 0 {
90                write!(f, ", ")?;
91            }
92            write!(f, "{expr_item}")?;
93        }
94        if let Some(a) = self.fetch {
95            write!(f, ", fetch={a}")?;
96        }
97        Ok(())
98    }
99
100    fn with_exprs_and_inputs(
101        &self,
102        exprs: Vec<datafusion::prelude::Expr>,
103        mut inputs: Vec<LogicalPlan>,
104    ) -> Result<Self> {
105        let mut zelf = self.clone();
106        zelf.expr = zelf
107            .expr
108            .into_iter()
109            .zip(exprs)
110            .map(|(sort, expr)| sort.with_expr(expr))
111            .collect();
112        zelf.input = Arc::new(inputs.pop().ok_or_else(|| {
113            DataFusionError::Internal("Expected exactly one input with MergeSort".to_string())
114        })?);
115        Ok(zelf)
116    }
117}
118
119pub fn merge_sort_transformer(plan: &LogicalPlan) -> Option<LogicalPlan> {
121    if let LogicalPlan::Sort(sort) = plan {
122        Some(
123            MergeSortLogicalPlan::new(sort.input.clone(), sort.expr.clone(), sort.fetch)
124                .into_logical_plan(),
125        )
126    } else {
127        None
128    }
129}