Skip to main content

query/optimizer/
windowed_sort.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use datafusion::physical_optimizer::PhysicalOptimizerRule;
19use datafusion::physical_plan::ExecutionPlan;
20use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
21use datafusion::physical_plan::coop::CooperativeExec;
22use datafusion::physical_plan::filter::FilterExec;
23use datafusion::physical_plan::projection::ProjectionExec;
24use datafusion::physical_plan::repartition::RepartitionExec;
25use datafusion::physical_plan::sorts::sort::SortExec;
26use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
27use datafusion_common::Result as DataFusionResult;
28use datafusion_common::tree_node::{Transformed, TreeNode};
29use datafusion_physical_expr::expressions::Column as PhysicalColumn;
30use store_api::region_engine::PartitionRange;
31use table::table::scan::RegionScanExec;
32
33use crate::part_sort::PartSortExec;
34use crate::window_sort::WindowedSortExec;
35
36/// Optimize rule for windowed sort.
37///
38/// This is expected to run after [`ScanHint`] and [`ParallelizeScan`].
39/// It would change the original sort to a custom plan. To make sure
40/// other rules are applied correctly, this rule can be run as later as
41/// possible.
42///
43/// [`ScanHint`]: crate::optimizer::scan_hint::ScanHintRule
44/// [`ParallelizeScan`]: crate::optimizer::parallelize_scan::ParallelizeScan
45#[derive(Debug)]
46pub struct WindowedSortPhysicalRule;
47
48impl PhysicalOptimizerRule for WindowedSortPhysicalRule {
49    fn optimize(
50        &self,
51        plan: Arc<dyn ExecutionPlan>,
52        config: &datafusion::config::ConfigOptions,
53    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
54        Self::do_optimize(plan, config)
55    }
56
57    fn name(&self) -> &str {
58        "WindowedSortRule"
59    }
60
61    fn schema_check(&self) -> bool {
62        false
63    }
64}
65
66impl WindowedSortPhysicalRule {
67    fn do_optimize(
68        plan: Arc<dyn ExecutionPlan>,
69        _config: &datafusion::config::ConfigOptions,
70    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
71        let result = plan
72            .transform_down(|plan| {
73                if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
74                    // TODO: support multiple expr in windowed sort
75                    if sort_exec.expr().len() != 1 {
76                        return Ok(Transformed::no(plan));
77                    }
78
79                    let preserve_partitioning = sort_exec.preserve_partitioning();
80
81                    let sort_input = remove_repartition(sort_exec.input().clone())?.data;
82
83                    // Gets scanner info from the input without repartition before filter.
84                    let Some(scanner_info) = fetch_partition_range(sort_input.clone())? else {
85                        return Ok(Transformed::no(plan));
86                    };
87                    let input_schema = sort_input.schema();
88
89                    let first_sort_expr = sort_exec.expr().first();
90                    if let Some(column_expr) = first_sort_expr
91                        .expr
92                        .as_any()
93                        .downcast_ref::<PhysicalColumn>()
94                        && scanner_info
95                            .time_index
96                            .contains(input_schema.field(column_expr.index()).name())
97                        && sort_exec.fetch().is_none()
98                    // skip if there is a limit, as dyn filter along is good enough in this case
99                    {
100                    } else {
101                        return Ok(Transformed::no(plan));
102                    }
103
104                    // PartSortExec is unnecessary if:
105                    // - there is no tag column, and
106                    // - the sort is ascending on the time index column
107                    let new_input = if scanner_info.tag_columns.is_empty()
108                        && !first_sort_expr.options.descending
109                    {
110                        sort_input
111                    } else {
112                        Arc::new(PartSortExec::try_new(
113                            first_sort_expr.clone(),
114                            sort_exec.fetch(),
115                            scanner_info.partition_ranges.clone(),
116                            sort_input,
117                        )?)
118                    };
119
120                    let windowed_sort_exec = WindowedSortExec::try_new(
121                        first_sort_expr.clone(),
122                        sort_exec.fetch(),
123                        scanner_info.partition_ranges,
124                        new_input,
125                    )?;
126
127                    if !preserve_partitioning {
128                        let order_preserving_merge = SortPreservingMergeExec::new(
129                            sort_exec.expr().clone(),
130                            Arc::new(windowed_sort_exec),
131                        );
132                        return Ok(Transformed {
133                            data: Arc::new(order_preserving_merge),
134                            transformed: true,
135                            tnr: datafusion_common::tree_node::TreeNodeRecursion::Stop,
136                        });
137                    } else {
138                        return Ok(Transformed {
139                            data: Arc::new(windowed_sort_exec),
140                            transformed: true,
141                            tnr: datafusion_common::tree_node::TreeNodeRecursion::Stop,
142                        });
143                    }
144                }
145
146                Ok(Transformed::no(plan))
147            })?
148            .data;
149
150        Ok(result)
151    }
152}
153
154#[derive(Debug)]
155struct ScannerInfo {
156    partition_ranges: Vec<Vec<PartitionRange>>,
157    time_index: HashSet<String>,
158    tag_columns: Vec<String>,
159}
160
161fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Option<ScannerInfo>> {
162    let mut partition_ranges = None;
163    let mut time_index = HashSet::new();
164    let mut alias_map = Vec::new();
165    let mut tag_columns = None;
166
167    input.transform_up(|plan| {
168        if plan.as_any().is::<CooperativeExec>() {
169            return Ok(Transformed::no(plan));
170        }
171
172        // Unappliable case, reset the state.
173        if plan.as_any().is::<RepartitionExec>()
174            || plan.as_any().is::<CoalescePartitionsExec>()
175            || plan.as_any().is::<SortExec>()
176            || plan.as_any().is::<WindowedSortExec>()
177        {
178            partition_ranges = None;
179        }
180
181        // only a very limited set of plans can exist between region scan and sort exec
182        // other plans might make this optimize wrong, so be safe here by limiting it
183        if !(plan.as_any().is::<ProjectionExec>() || plan.as_any().is::<FilterExec>()) {
184            partition_ranges = None;
185        }
186
187        // TODO(discord9): do this in logical plan instead as it's lessy bugy there
188        // Collects alias of the time index column.
189        if let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() {
190            for expr in projection.expr() {
191                if let Some(column_expr) = expr.expr.as_any().downcast_ref::<PhysicalColumn>() {
192                    alias_map.push((column_expr.name().to_string(), expr.alias.clone()));
193                }
194            }
195            // resolve alias properly
196            time_index = resolve_alias(&alias_map, &time_index);
197        }
198
199        if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
200            // `PerSeries` distribution is not supported in windowed sort.
201            if region_scan_exec.distribution()
202                == Some(store_api::storage::TimeSeriesDistribution::PerSeries)
203            {
204                partition_ranges = None;
205                return Ok(Transformed::no(plan));
206            }
207
208            partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
209            // Reset time index column.
210            time_index = HashSet::from([region_scan_exec.time_index()]);
211            tag_columns = Some(region_scan_exec.tag_columns());
212
213            region_scan_exec.with_distinguish_partition_range(true);
214        }
215
216        Ok(Transformed::no(plan))
217    })?;
218
219    let result = try {
220        ScannerInfo {
221            partition_ranges: partition_ranges?,
222            time_index,
223            tag_columns: tag_columns?,
224        }
225    };
226
227    Ok(result)
228}
229
230/// Removes the repartition plan between the filter and region scan.
231fn remove_repartition(
232    plan: Arc<dyn ExecutionPlan>,
233) -> DataFusionResult<Transformed<Arc<dyn ExecutionPlan>>> {
234    plan.transform_down(|plan| {
235        if plan.as_any().is::<FilterExec>() {
236            // Checks child.
237            let maybe_repartition = plan.children()[0];
238            if maybe_repartition.as_any().is::<RepartitionExec>() {
239                let maybe_scan = maybe_repartition.children()[0];
240                if maybe_scan.as_any().is::<RegionScanExec>() {
241                    let new_filter = plan.clone().with_new_children(vec![maybe_scan.clone()])?;
242                    return Ok(Transformed::yes(new_filter));
243                }
244            }
245        }
246
247        Ok(Transformed::no(plan))
248    })
249}
250
251/// Resolves alias of the time index column.
252///
253/// i.e if a is time index, alias= {a:b, b:c}, then result should be {a, b}(not {a, c}) because projection is not transitive
254/// if alias={b:a} and a is time index, then return empty
255fn resolve_alias(alias_map: &[(String, String)], time_index: &HashSet<String>) -> HashSet<String> {
256    // available old name for time index
257    let mut avail_old_name = time_index.clone();
258    let mut new_time_index = HashSet::new();
259    for (old, new) in alias_map {
260        if time_index.contains(old) {
261            new_time_index.insert(new.clone());
262        } else if time_index.contains(new) && old != new {
263            // other alias to time index, remove the old name
264            avail_old_name.remove(new);
265            continue;
266        }
267    }
268    // add the remaining time index that is not in alias map
269    new_time_index.extend(avail_old_name);
270    new_time_index
271}
272
273#[cfg(test)]
274mod test {
275    use itertools::Itertools;
276
277    use super::*;
278
279    #[test]
280    fn test_alias() {
281        let testcases = [
282            // notice the old name is still in the result
283            (
284                vec![("a", "b"), ("b", "c")],
285                HashSet::from(["a"]),
286                HashSet::from(["a", "b"]),
287            ),
288            // alias swap
289            (
290                vec![("b", "a"), ("a", "b")],
291                HashSet::from(["a"]),
292                HashSet::from(["b"]),
293            ),
294            (
295                vec![("b", "a"), ("b", "c")],
296                HashSet::from(["a"]),
297                HashSet::from([]),
298            ),
299            // not in alias map
300            (
301                vec![("c", "d"), ("d", "c")],
302                HashSet::from(["a"]),
303                HashSet::from(["a"]),
304            ),
305            // no alias
306            (vec![], HashSet::from(["a"]), HashSet::from(["a"])),
307            // empty time index
308            (vec![], HashSet::from([]), HashSet::from([])),
309        ];
310        for (alias_map, time_index, expected) in testcases {
311            let alias_map = alias_map
312                .into_iter()
313                .map(|(k, v)| (k.to_string(), v.to_string()))
314                .collect_vec();
315            let time_index = time_index.into_iter().map(|i| i.to_string()).collect();
316            let expected: HashSet<String> = expected.into_iter().map(|i| i.to_string()).collect();
317
318            assert_eq!(
319                expected,
320                resolve_alias(&alias_map, &time_index),
321                "alias_map={:?}, time_index={:?}",
322                alias_map,
323                time_index
324            );
325        }
326    }
327}