query/dist_plan/merge_sort.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Merge sort logical plan for distributed query execution, roughly corresponding to the
//! `SortPreservingMergeExec` operator in datafusion
//!
use std::fmt;
use std::sync::Arc;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
/// MergeSort Logical Plan, have same field as `Sort`, but indicate it is a merge sort,
/// which assume each input partition is a sorted stream, and will use `SortPreserveingMergeExec`
/// to merge them into a single sorted stream.
#[derive(Hash, PartialEq, Eq, Clone)]
pub struct MergeSortLogicalPlan {
pub expr: Vec<Expr>,
pub input: Arc<LogicalPlan>,
pub fetch: Option<usize>,
}
impl fmt::Debug for MergeSortLogicalPlan {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
UserDefinedLogicalNodeCore::fmt_for_explain(self, f)
}
}
impl MergeSortLogicalPlan {
pub fn new(input: Arc<LogicalPlan>, expr: Vec<Expr>, fetch: Option<usize>) -> Self {
Self { input, expr, fetch }
}
pub fn name() -> &'static str {
"MergeSort"
}
/// Create a [`LogicalPlan::Extension`] node from this merge sort plan
pub fn into_logical_plan(self) -> LogicalPlan {
LogicalPlan::Extension(Extension {
node: Arc::new(self),
})
}
/// Convert self to a [`Sort`] logical plan with same input and expressions
pub fn into_sort(self) -> LogicalPlan {
LogicalPlan::Sort(datafusion::logical_expr::Sort {
input: self.input.clone(),
expr: self.expr,
fetch: self.fetch,
})
}
}
impl UserDefinedLogicalNodeCore for MergeSortLogicalPlan {
fn name(&self) -> &str {
Self::name()
}
// Allow optimization here
fn inputs(&self) -> Vec<&LogicalPlan> {
vec![self.input.as_ref()]
}
fn schema(&self) -> &datafusion_common::DFSchemaRef {
self.input.schema()
}
// Allow further optimization
fn expressions(&self) -> Vec<datafusion_expr::Expr> {
self.expr.clone()
}
fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "MergeSort: ")?;
for (i, expr_item) in self.expr.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{expr_item}")?;
}
if let Some(a) = self.fetch {
write!(f, ", fetch={a}")?;
}
Ok(())
}
fn with_exprs_and_inputs(
&self,
exprs: Vec<datafusion::prelude::Expr>,
mut inputs: Vec<LogicalPlan>,
) -> Result<Self> {
let mut zelf = self.clone();
zelf.expr = exprs;
zelf.input = Arc::new(inputs.pop().ok_or_else(|| {
DataFusionError::Internal("Expected exactly one input with MergeSort".to_string())
})?);
Ok(zelf)
}
}
/// Turn `Sort` into `MergeSort` if possible
pub fn merge_sort_transformer(plan: &LogicalPlan) -> Option<LogicalPlan> {
if let LogicalPlan::Sort(sort) = plan {
Some(
MergeSortLogicalPlan::new(sort.input.clone(), sort.expr.clone(), sort.fetch)
.into_logical_plan(),
)
} else {
None
}
}