query/optimizer/
windowed_sort.rs1use std::collections::HashSet;
16use std::sync::Arc;
17
18use datafusion::physical_optimizer::PhysicalOptimizerRule;
19use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
20use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
21use datafusion::physical_plan::filter::FilterExec;
22use datafusion::physical_plan::projection::ProjectionExec;
23use datafusion::physical_plan::repartition::RepartitionExec;
24use datafusion::physical_plan::sorts::sort::SortExec;
25use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
26use datafusion::physical_plan::ExecutionPlan;
27use datafusion_common::tree_node::{Transformed, TreeNode};
28use datafusion_common::Result as DataFusionResult;
29use datafusion_physical_expr::expressions::Column as PhysicalColumn;
30use datafusion_physical_expr::LexOrdering;
31use store_api::region_engine::PartitionRange;
32use table::table::scan::RegionScanExec;
33
34use crate::part_sort::PartSortExec;
35use crate::window_sort::WindowedSortExec;
36
37#[derive(Debug)]
47pub struct WindowedSortPhysicalRule;
48
49impl PhysicalOptimizerRule for WindowedSortPhysicalRule {
50 fn optimize(
51 &self,
52 plan: Arc<dyn ExecutionPlan>,
53 config: &datafusion::config::ConfigOptions,
54 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
55 Self::do_optimize(plan, config)
56 }
57
58 fn name(&self) -> &str {
59 "WindowedSortRule"
60 }
61
62 fn schema_check(&self) -> bool {
63 false
64 }
65}
66
67impl WindowedSortPhysicalRule {
68 fn do_optimize(
69 plan: Arc<dyn ExecutionPlan>,
70 _config: &datafusion::config::ConfigOptions,
71 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
72 let result = plan
73 .transform_down(|plan| {
74 if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
75 if sort_exec.expr().len() != 1 {
77 return Ok(Transformed::no(plan));
78 }
79
80 let preserve_partitioning = sort_exec.preserve_partitioning();
81
82 let sort_input = remove_repartition(sort_exec.input().clone())?.data;
83 let sort_input =
84 remove_coalesce_batches_exec(sort_input, sort_exec.fetch())?.data;
85
86 let Some(scanner_info) = fetch_partition_range(sort_input.clone())? else {
88 return Ok(Transformed::no(plan));
89 };
90 let input_schema = sort_input.schema();
91
92 if let Some(first_sort_expr) = sort_exec.expr().first()
93 && let Some(column_expr) = first_sort_expr
94 .expr
95 .as_any()
96 .downcast_ref::<PhysicalColumn>()
97 && scanner_info
98 .time_index
99 .contains(input_schema.field(column_expr.index()).name())
100 {
101 } else {
102 return Ok(Transformed::no(plan));
103 }
104 let first_sort_expr = sort_exec.expr().first().unwrap().clone();
105
106 let new_input = if scanner_info.tag_columns.is_empty()
110 && !first_sort_expr.options.descending
111 {
112 sort_input
113 } else {
114 Arc::new(PartSortExec::new(
115 first_sort_expr.clone(),
116 sort_exec.fetch(),
117 scanner_info.partition_ranges.clone(),
118 sort_input,
119 ))
120 };
121
122 let windowed_sort_exec = WindowedSortExec::try_new(
123 first_sort_expr,
124 sort_exec.fetch(),
125 scanner_info.partition_ranges,
126 new_input,
127 )?;
128
129 if !preserve_partitioning {
130 let order_preserving_merge = SortPreservingMergeExec::new(
131 LexOrdering::new(sort_exec.expr().to_vec()),
132 Arc::new(windowed_sort_exec),
133 );
134 return Ok(Transformed {
135 data: Arc::new(order_preserving_merge),
136 transformed: true,
137 tnr: datafusion_common::tree_node::TreeNodeRecursion::Stop,
138 });
139 } else {
140 return Ok(Transformed {
141 data: Arc::new(windowed_sort_exec),
142 transformed: true,
143 tnr: datafusion_common::tree_node::TreeNodeRecursion::Stop,
144 });
145 }
146 }
147
148 Ok(Transformed::no(plan))
149 })?
150 .data;
151
152 Ok(result)
153 }
154}
155
156#[derive(Debug)]
157struct ScannerInfo {
158 partition_ranges: Vec<Vec<PartitionRange>>,
159 time_index: HashSet<String>,
160 tag_columns: Vec<String>,
161}
162
163fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Option<ScannerInfo>> {
164 let mut partition_ranges = None;
165 let mut time_index = HashSet::new();
166 let mut alias_map = Vec::new();
167 let mut tag_columns = None;
168 let mut is_batch_coalesced = false;
169
170 input.transform_up(|plan| {
171 if plan.as_any().is::<RepartitionExec>()
173 || plan.as_any().is::<CoalescePartitionsExec>()
174 || plan.as_any().is::<SortExec>()
175 || plan.as_any().is::<WindowedSortExec>()
176 {
177 partition_ranges = None;
178 }
179
180 if plan.as_any().is::<CoalesceBatchesExec>() {
181 is_batch_coalesced = true;
182 }
183
184 if !(plan.as_any().is::<ProjectionExec>()
187 || plan.as_any().is::<FilterExec>()
188 || plan.as_any().is::<CoalesceBatchesExec>())
189 {
190 partition_ranges = None;
191 }
192
193 if let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() {
196 for (expr, output_name) in projection.expr() {
197 if let Some(column_expr) = expr.as_any().downcast_ref::<PhysicalColumn>() {
198 alias_map.push((column_expr.name().to_string(), output_name.clone()));
199 }
200 }
201 time_index = resolve_alias(&alias_map, &time_index);
203 }
204
205 if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
206 if region_scan_exec.distribution()
208 == Some(store_api::storage::TimeSeriesDistribution::PerSeries)
209 {
210 partition_ranges = None;
211 return Ok(Transformed::no(plan));
212 }
213
214 partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
215 time_index = HashSet::from([region_scan_exec.time_index()]);
217 tag_columns = Some(region_scan_exec.tag_columns());
218
219 if !is_batch_coalesced {
221 region_scan_exec.with_distinguish_partition_range(true);
222 }
223 }
224
225 Ok(Transformed::no(plan))
226 })?;
227
228 let result = try {
229 ScannerInfo {
230 partition_ranges: partition_ranges?,
231 time_index,
232 tag_columns: tag_columns?,
233 }
234 };
235
236 Ok(result)
237}
238
239fn remove_repartition(
241 plan: Arc<dyn ExecutionPlan>,
242) -> DataFusionResult<Transformed<Arc<dyn ExecutionPlan>>> {
243 plan.transform_down(|plan| {
244 if plan.as_any().is::<FilterExec>() {
245 let maybe_repartition = plan.children()[0];
247 if maybe_repartition.as_any().is::<RepartitionExec>() {
248 let maybe_scan = maybe_repartition.children()[0];
249 if maybe_scan.as_any().is::<RegionScanExec>() {
250 let new_filter = plan.clone().with_new_children(vec![maybe_scan.clone()])?;
251 return Ok(Transformed::yes(new_filter));
252 }
253 }
254 }
255
256 Ok(Transformed::no(plan))
257 })
258}
259
260fn remove_coalesce_batches_exec(
264 plan: Arc<dyn ExecutionPlan>,
265 fetch: Option<usize>,
266) -> DataFusionResult<Transformed<Arc<dyn ExecutionPlan>>> {
267 let Some(fetch) = fetch else {
268 return Ok(Transformed::no(plan));
269 };
270
271 let mut is_done = false;
273
274 plan.transform_down(|plan| {
275 if let Some(coalesce_batches_exec) = plan.as_any().downcast_ref::<CoalesceBatchesExec>() {
276 let target_batch_size = coalesce_batches_exec.target_batch_size();
277 if fetch < target_batch_size && !is_done {
278 is_done = true;
279 return Ok(Transformed::yes(coalesce_batches_exec.input().clone()));
280 }
281 }
282
283 Ok(Transformed::no(plan))
284 })
285}
286
287fn resolve_alias(alias_map: &[(String, String)], time_index: &HashSet<String>) -> HashSet<String> {
292 let mut avail_old_name = time_index.clone();
294 let mut new_time_index = HashSet::new();
295 for (old, new) in alias_map {
296 if time_index.contains(old) {
297 new_time_index.insert(new.clone());
298 } else if time_index.contains(new) && old != new {
299 avail_old_name.remove(new);
301 continue;
302 }
303 }
304 new_time_index.extend(avail_old_name);
306 new_time_index
307}
308
309#[cfg(test)]
310mod test {
311 use itertools::Itertools;
312
313 use super::*;
314
315 #[test]
316 fn test_alias() {
317 let testcases = [
318 (
320 vec![("a", "b"), ("b", "c")],
321 HashSet::from(["a"]),
322 HashSet::from(["a", "b"]),
323 ),
324 (
326 vec![("b", "a"), ("a", "b")],
327 HashSet::from(["a"]),
328 HashSet::from(["b"]),
329 ),
330 (
331 vec![("b", "a"), ("b", "c")],
332 HashSet::from(["a"]),
333 HashSet::from([]),
334 ),
335 (
337 vec![("c", "d"), ("d", "c")],
338 HashSet::from(["a"]),
339 HashSet::from(["a"]),
340 ),
341 (vec![], HashSet::from(["a"]), HashSet::from(["a"])),
343 (vec![], HashSet::from([]), HashSet::from([])),
345 ];
346 for (alias_map, time_index, expected) in testcases {
347 let alias_map = alias_map
348 .into_iter()
349 .map(|(k, v)| (k.to_string(), v.to_string()))
350 .collect_vec();
351 let time_index = time_index.into_iter().map(|i| i.to_string()).collect();
352 let expected: HashSet<String> = expected.into_iter().map(|i| i.to_string()).collect();
353
354 assert_eq!(
355 expected,
356 resolve_alias(&alias_map, &time_index),
357 "alias_map={:?}, time_index={:?}",
358 alias_map,
359 time_index
360 );
361 }
362 }
363}