query/dist_plan/
merge_sort.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Merge sort logical plan for distributed query execution, roughly corresponding to the
16//! `SortPreservingMergeExec` operator in datafusion
17//!
18
19use std::fmt;
20use std::sync::Arc;
21
22use datafusion_common::{DataFusionError, Result};
23use datafusion_expr::{Extension, LogicalPlan, SortExpr, UserDefinedLogicalNodeCore};
24
25/// MergeSort Logical Plan, have same field as `Sort`, but indicate it is a merge sort,
26/// which assume each input partition is a sorted stream, and will use `SortPreserveingMergeExec`
27/// to merge them into a single sorted stream.
28#[derive(Hash, PartialOrd, PartialEq, Eq, Clone)]
29pub struct MergeSortLogicalPlan {
30    pub expr: Vec<SortExpr>,
31    pub input: Arc<LogicalPlan>,
32    pub fetch: Option<usize>,
33}
34
35impl fmt::Debug for MergeSortLogicalPlan {
36    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
37        UserDefinedLogicalNodeCore::fmt_for_explain(self, f)
38    }
39}
40
41impl MergeSortLogicalPlan {
42    pub fn new(input: Arc<LogicalPlan>, expr: Vec<SortExpr>, fetch: Option<usize>) -> Self {
43        Self { input, expr, fetch }
44    }
45
46    pub fn name() -> &'static str {
47        "MergeSort"
48    }
49
50    /// Create a [`LogicalPlan::Extension`] node from this merge sort plan
51    pub fn into_logical_plan(self) -> LogicalPlan {
52        LogicalPlan::Extension(Extension {
53            node: Arc::new(self),
54        })
55    }
56
57    /// Convert self to a [`Sort`] logical plan with same input and expressions
58    pub fn into_sort(self) -> LogicalPlan {
59        LogicalPlan::Sort(datafusion::logical_expr::Sort {
60            input: self.input.clone(),
61            expr: self.expr,
62            fetch: self.fetch,
63        })
64    }
65}
66
67impl UserDefinedLogicalNodeCore for MergeSortLogicalPlan {
68    fn name(&self) -> &str {
69        Self::name()
70    }
71
72    // Allow optimization here
73    fn inputs(&self) -> Vec<&LogicalPlan> {
74        vec![self.input.as_ref()]
75    }
76
77    fn schema(&self) -> &datafusion_common::DFSchemaRef {
78        self.input.schema()
79    }
80
81    // Allow further optimization
82    fn expressions(&self) -> Vec<datafusion_expr::Expr> {
83        self.expr.iter().map(|sort| sort.expr.clone()).collect()
84    }
85
86    fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
87        write!(f, "MergeSort: ")?;
88        for (i, expr_item) in self.expr.iter().enumerate() {
89            if i > 0 {
90                write!(f, ", ")?;
91            }
92            write!(f, "{expr_item}")?;
93        }
94        if let Some(a) = self.fetch {
95            write!(f, ", fetch={a}")?;
96        }
97        Ok(())
98    }
99
100    fn with_exprs_and_inputs(
101        &self,
102        exprs: Vec<datafusion::prelude::Expr>,
103        mut inputs: Vec<LogicalPlan>,
104    ) -> Result<Self> {
105        let mut zelf = self.clone();
106        zelf.expr = zelf
107            .expr
108            .into_iter()
109            .zip(exprs)
110            .map(|(sort, expr)| sort.with_expr(expr))
111            .collect();
112        zelf.input = Arc::new(inputs.pop().ok_or_else(|| {
113            DataFusionError::Internal("Expected exactly one input with MergeSort".to_string())
114        })?);
115        Ok(zelf)
116    }
117}
118
119/// Turn `Sort` into `MergeSort` if possible
120pub fn merge_sort_transformer(plan: &LogicalPlan) -> Option<LogicalPlan> {
121    if let LogicalPlan::Sort(sort) = plan {
122        Some(
123            MergeSortLogicalPlan::new(sort.input.clone(), sort.expr.clone(), sort.fetch)
124                .into_logical_plan(),
125        )
126    } else {
127        None
128    }
129}