query/dist_plan/
merge_sort.rs1use std::fmt;
20use std::sync::Arc;
21
22use datafusion_common::{DataFusionError, Result};
23use datafusion_expr::{Extension, LogicalPlan, SortExpr, UserDefinedLogicalNodeCore};
24
25#[derive(Hash, PartialOrd, PartialEq, Eq, Clone)]
29pub struct MergeSortLogicalPlan {
30 pub expr: Vec<SortExpr>,
31 pub input: Arc<LogicalPlan>,
32 pub fetch: Option<usize>,
33}
34
35impl fmt::Debug for MergeSortLogicalPlan {
36 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
37 UserDefinedLogicalNodeCore::fmt_for_explain(self, f)
38 }
39}
40
41impl MergeSortLogicalPlan {
42 pub fn new(input: Arc<LogicalPlan>, expr: Vec<SortExpr>, fetch: Option<usize>) -> Self {
43 Self { input, expr, fetch }
44 }
45
46 pub fn name() -> &'static str {
47 "MergeSort"
48 }
49
50 pub fn into_logical_plan(self) -> LogicalPlan {
52 LogicalPlan::Extension(Extension {
53 node: Arc::new(self),
54 })
55 }
56
57 pub fn into_sort(self) -> LogicalPlan {
59 LogicalPlan::Sort(datafusion::logical_expr::Sort {
60 input: self.input.clone(),
61 expr: self.expr,
62 fetch: self.fetch,
63 })
64 }
65}
66
67impl UserDefinedLogicalNodeCore for MergeSortLogicalPlan {
68 fn name(&self) -> &str {
69 Self::name()
70 }
71
72 fn inputs(&self) -> Vec<&LogicalPlan> {
74 vec![self.input.as_ref()]
75 }
76
77 fn schema(&self) -> &datafusion_common::DFSchemaRef {
78 self.input.schema()
79 }
80
81 fn expressions(&self) -> Vec<datafusion_expr::Expr> {
83 self.expr.iter().map(|sort| sort.expr.clone()).collect()
84 }
85
86 fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
87 write!(f, "MergeSort: ")?;
88 for (i, expr_item) in self.expr.iter().enumerate() {
89 if i > 0 {
90 write!(f, ", ")?;
91 }
92 write!(f, "{expr_item}")?;
93 }
94 if let Some(a) = self.fetch {
95 write!(f, ", fetch={a}")?;
96 }
97 Ok(())
98 }
99
100 fn with_exprs_and_inputs(
101 &self,
102 exprs: Vec<datafusion::prelude::Expr>,
103 mut inputs: Vec<LogicalPlan>,
104 ) -> Result<Self> {
105 let mut zelf = self.clone();
106 zelf.expr = zelf
107 .expr
108 .into_iter()
109 .zip(exprs)
110 .map(|(sort, expr)| sort.with_expr(expr))
111 .collect();
112 zelf.input = Arc::new(inputs.pop().ok_or_else(|| {
113 DataFusionError::Internal("Expected exactly one input with MergeSort".to_string())
114 })?);
115 Ok(zelf)
116 }
117}
118
119pub fn merge_sort_transformer(plan: &LogicalPlan) -> Option<LogicalPlan> {
121 if let LogicalPlan::Sort(sort) = plan {
122 Some(
123 MergeSortLogicalPlan::new(sort.input.clone(), sort.expr.clone(), sort.fetch)
124 .into_logical_plan(),
125 )
126 } else {
127 None
128 }
129}