query/optimizer/
windowed_sort.rs1use std::collections::HashSet;
16use std::sync::Arc;
17
18use datafusion::physical_optimizer::PhysicalOptimizerRule;
19use datafusion::physical_plan::ExecutionPlan;
20use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
21use datafusion::physical_plan::coop::CooperativeExec;
22use datafusion::physical_plan::filter::FilterExec;
23use datafusion::physical_plan::projection::ProjectionExec;
24use datafusion::physical_plan::repartition::RepartitionExec;
25use datafusion::physical_plan::sorts::sort::SortExec;
26use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
27use datafusion_common::Result as DataFusionResult;
28use datafusion_common::tree_node::{Transformed, TreeNode};
29use datafusion_physical_expr::expressions::Column as PhysicalColumn;
30use store_api::region_engine::PartitionRange;
31use table::table::scan::RegionScanExec;
32
33use crate::part_sort::PartSortExec;
34use crate::window_sort::WindowedSortExec;
35
36#[derive(Debug)]
46pub struct WindowedSortPhysicalRule;
47
48impl PhysicalOptimizerRule for WindowedSortPhysicalRule {
49 fn optimize(
50 &self,
51 plan: Arc<dyn ExecutionPlan>,
52 config: &datafusion::config::ConfigOptions,
53 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
54 Self::do_optimize(plan, config)
55 }
56
57 fn name(&self) -> &str {
58 "WindowedSortRule"
59 }
60
61 fn schema_check(&self) -> bool {
62 false
63 }
64}
65
66impl WindowedSortPhysicalRule {
67 fn do_optimize(
68 plan: Arc<dyn ExecutionPlan>,
69 _config: &datafusion::config::ConfigOptions,
70 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
71 let result = plan
72 .transform_down(|plan| {
73 if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
74 if sort_exec.expr().len() != 1 {
76 return Ok(Transformed::no(plan));
77 }
78
79 let preserve_partitioning = sort_exec.preserve_partitioning();
80
81 let sort_input = remove_repartition(sort_exec.input().clone())?.data;
82
83 let Some(scanner_info) = fetch_partition_range(sort_input.clone())? else {
85 return Ok(Transformed::no(plan));
86 };
87 let input_schema = sort_input.schema();
88
89 let first_sort_expr = sort_exec.expr().first();
90 if let Some(column_expr) = first_sort_expr
91 .expr
92 .as_any()
93 .downcast_ref::<PhysicalColumn>()
94 && scanner_info
95 .time_index
96 .contains(input_schema.field(column_expr.index()).name())
97 && sort_exec.fetch().is_none()
98 {
100 } else {
101 return Ok(Transformed::no(plan));
102 }
103
104 let new_input = if scanner_info.tag_columns.is_empty()
108 && !first_sort_expr.options.descending
109 {
110 sort_input
111 } else {
112 Arc::new(PartSortExec::try_new(
113 first_sort_expr.clone(),
114 sort_exec.fetch(),
115 scanner_info.partition_ranges.clone(),
116 sort_input,
117 )?)
118 };
119
120 let windowed_sort_exec = WindowedSortExec::try_new(
121 first_sort_expr.clone(),
122 sort_exec.fetch(),
123 scanner_info.partition_ranges,
124 new_input,
125 )?;
126
127 if !preserve_partitioning {
128 let order_preserving_merge = SortPreservingMergeExec::new(
129 sort_exec.expr().clone(),
130 Arc::new(windowed_sort_exec),
131 );
132 return Ok(Transformed {
133 data: Arc::new(order_preserving_merge),
134 transformed: true,
135 tnr: datafusion_common::tree_node::TreeNodeRecursion::Stop,
136 });
137 } else {
138 return Ok(Transformed {
139 data: Arc::new(windowed_sort_exec),
140 transformed: true,
141 tnr: datafusion_common::tree_node::TreeNodeRecursion::Stop,
142 });
143 }
144 }
145
146 Ok(Transformed::no(plan))
147 })?
148 .data;
149
150 Ok(result)
151 }
152}
153
154#[derive(Debug)]
155struct ScannerInfo {
156 partition_ranges: Vec<Vec<PartitionRange>>,
157 time_index: HashSet<String>,
158 tag_columns: Vec<String>,
159}
160
161fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Option<ScannerInfo>> {
162 let mut partition_ranges = None;
163 let mut time_index = HashSet::new();
164 let mut alias_map = Vec::new();
165 let mut tag_columns = None;
166
167 input.transform_up(|plan| {
168 if plan.as_any().is::<CooperativeExec>() {
169 return Ok(Transformed::no(plan));
170 }
171
172 if plan.as_any().is::<RepartitionExec>()
174 || plan.as_any().is::<CoalescePartitionsExec>()
175 || plan.as_any().is::<SortExec>()
176 || plan.as_any().is::<WindowedSortExec>()
177 {
178 partition_ranges = None;
179 }
180
181 if !(plan.as_any().is::<ProjectionExec>() || plan.as_any().is::<FilterExec>()) {
184 partition_ranges = None;
185 }
186
187 if let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() {
190 for expr in projection.expr() {
191 if let Some(column_expr) = expr.expr.as_any().downcast_ref::<PhysicalColumn>() {
192 alias_map.push((column_expr.name().to_string(), expr.alias.clone()));
193 }
194 }
195 time_index = resolve_alias(&alias_map, &time_index);
197 }
198
199 if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
200 if region_scan_exec.distribution()
202 == Some(store_api::storage::TimeSeriesDistribution::PerSeries)
203 {
204 partition_ranges = None;
205 return Ok(Transformed::no(plan));
206 }
207
208 partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
209 time_index = HashSet::from([region_scan_exec.time_index()]);
211 tag_columns = Some(region_scan_exec.tag_columns());
212
213 region_scan_exec.with_distinguish_partition_range(true);
214 }
215
216 Ok(Transformed::no(plan))
217 })?;
218
219 let result = try {
220 ScannerInfo {
221 partition_ranges: partition_ranges?,
222 time_index,
223 tag_columns: tag_columns?,
224 }
225 };
226
227 Ok(result)
228}
229
230fn remove_repartition(
232 plan: Arc<dyn ExecutionPlan>,
233) -> DataFusionResult<Transformed<Arc<dyn ExecutionPlan>>> {
234 plan.transform_down(|plan| {
235 if plan.as_any().is::<FilterExec>() {
236 let maybe_repartition = plan.children()[0];
238 if maybe_repartition.as_any().is::<RepartitionExec>() {
239 let maybe_scan = maybe_repartition.children()[0];
240 if maybe_scan.as_any().is::<RegionScanExec>() {
241 let new_filter = plan.clone().with_new_children(vec![maybe_scan.clone()])?;
242 return Ok(Transformed::yes(new_filter));
243 }
244 }
245 }
246
247 Ok(Transformed::no(plan))
248 })
249}
250
251fn resolve_alias(alias_map: &[(String, String)], time_index: &HashSet<String>) -> HashSet<String> {
256 let mut avail_old_name = time_index.clone();
258 let mut new_time_index = HashSet::new();
259 for (old, new) in alias_map {
260 if time_index.contains(old) {
261 new_time_index.insert(new.clone());
262 } else if time_index.contains(new) && old != new {
263 avail_old_name.remove(new);
265 continue;
266 }
267 }
268 new_time_index.extend(avail_old_name);
270 new_time_index
271}
272
273#[cfg(test)]
274mod test {
275 use itertools::Itertools;
276
277 use super::*;
278
279 #[test]
280 fn test_alias() {
281 let testcases = [
282 (
284 vec![("a", "b"), ("b", "c")],
285 HashSet::from(["a"]),
286 HashSet::from(["a", "b"]),
287 ),
288 (
290 vec![("b", "a"), ("a", "b")],
291 HashSet::from(["a"]),
292 HashSet::from(["b"]),
293 ),
294 (
295 vec![("b", "a"), ("b", "c")],
296 HashSet::from(["a"]),
297 HashSet::from([]),
298 ),
299 (
301 vec![("c", "d"), ("d", "c")],
302 HashSet::from(["a"]),
303 HashSet::from(["a"]),
304 ),
305 (vec![], HashSet::from(["a"]), HashSet::from(["a"])),
307 (vec![], HashSet::from([]), HashSet::from([])),
309 ];
310 for (alias_map, time_index, expected) in testcases {
311 let alias_map = alias_map
312 .into_iter()
313 .map(|(k, v)| (k.to_string(), v.to_string()))
314 .collect_vec();
315 let time_index = time_index.into_iter().map(|i| i.to_string()).collect();
316 let expected: HashSet<String> = expected.into_iter().map(|i| i.to_string()).collect();
317
318 assert_eq!(
319 expected,
320 resolve_alias(&alias_map, &time_index),
321 "alias_map={:?}, time_index={:?}",
322 alias_map,
323 time_index
324 );
325 }
326 }
327}