1use std::collections::VecDeque;
16
17use crate::error::Result;
18use crate::expr::PartitionExpr;
19use crate::overlap::associate_from_to;
20
21#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct RepartitionSubtask {
24 pub from_expr_indices: Vec<usize>,
25 pub to_expr_indices: Vec<usize>,
26 pub transition_map: Vec<Vec<usize>>,
29}
30
31pub fn create_subtasks(
33 from_exprs: &[PartitionExpr],
34 to_exprs: &[PartitionExpr],
35) -> Result<Vec<RepartitionSubtask>> {
36 let assoc = associate_from_to(from_exprs, to_exprs)?;
38 if !assoc.iter().any(|v| !v.is_empty()) {
39 return Ok(vec![]);
40 }
41
42 let mut rev = vec![Vec::new(); to_exprs.len()];
44 for (li, rights) in assoc.iter().enumerate() {
45 for &r in rights {
46 rev[r].push(li);
47 }
48 }
49
50 let mut visited_left = vec![false; from_exprs.len()];
52 let mut visited_right = vec![false; to_exprs.len()];
53 let mut subtasks = Vec::new();
54
55 for li in 0..from_exprs.len() {
56 if assoc[li].is_empty() || visited_left[li] {
57 continue;
58 }
59
60 #[derive(Copy, Clone)]
61 enum Node {
62 Left(usize),
63 Right(usize),
64 }
65 let mut left_set = Vec::new();
66 let mut right_set = Vec::new();
67 let mut queue = VecDeque::new();
68
69 visited_left[li] = true;
70 queue.push_back(Node::Left(li));
71
72 while let Some(node) = queue.pop_front() {
73 match node {
74 Node::Left(left) => {
75 left_set.push(left);
76 for &r in &assoc[left] {
77 if !visited_right[r] {
78 visited_right[r] = true;
79 queue.push_back(Node::Right(r));
80 }
81 }
82 }
83 Node::Right(right) => {
84 right_set.push(right);
85 for &l in &rev[right] {
86 if !visited_left[l] {
87 visited_left[l] = true;
88 queue.push_back(Node::Left(l));
89 }
90 }
91 }
92 }
93 }
94
95 left_set.sort_unstable();
96 right_set.sort_unstable();
97
98 let transition_map = left_set
99 .iter()
100 .map(|&i| assoc[i].clone())
101 .collect::<Vec<_>>();
102
103 subtasks.push(RepartitionSubtask {
104 from_expr_indices: left_set,
105 to_expr_indices: right_set,
106 transition_map,
107 });
108 }
109
110 Ok(subtasks)
111}
112
113#[cfg(test)]
114mod tests {
115 use datatypes::value::Value;
116
117 use super::*;
118 use crate::expr::col;
119 #[test]
120 fn test_split_one_to_two() {
121 let from = vec![
123 col("u")
124 .gt_eq(Value::Int64(0))
125 .and(col("u").lt(Value::Int64(20))),
126 ];
127
128 let to = vec![
130 col("u")
131 .gt_eq(Value::Int64(0))
132 .and(col("u").lt(Value::Int64(10))),
133 col("u")
134 .gt_eq(Value::Int64(10))
135 .and(col("u").lt(Value::Int64(20))),
136 ];
137
138 let subtasks = create_subtasks(&from, &to).unwrap();
139 assert_eq!(subtasks.len(), 1);
140 assert_eq!(subtasks[0].from_expr_indices, vec![0]);
141 assert_eq!(subtasks[0].to_expr_indices, vec![0, 1]);
142 assert_eq!(subtasks[0].transition_map[0], vec![0, 1]);
143 }
144
145 #[test]
146 fn test_merge_two_to_one() {
147 let from = vec![
149 col("u")
150 .gt_eq(Value::Int64(0))
151 .and(col("u").lt(Value::Int64(10))),
152 col("u")
153 .gt_eq(Value::Int64(10))
154 .and(col("u").lt(Value::Int64(20))),
155 ];
156 let to = vec![
158 col("u")
159 .gt_eq(Value::Int64(0))
160 .and(col("u").lt(Value::Int64(20))),
161 ];
162
163 let subtasks = create_subtasks(&from, &to).unwrap();
164 assert_eq!(subtasks.len(), 1);
165 assert_eq!(subtasks[0].from_expr_indices, vec![0, 1]);
166 assert_eq!(subtasks[0].to_expr_indices, vec![0]);
167 assert_eq!(subtasks[0].transition_map[0], vec![0]);
168 assert_eq!(subtasks[0].transition_map[1], vec![0]);
169 }
170
171 #[test]
172 fn test_create_subtasks_disconnected() {
173 let from = vec![
175 col("x")
176 .gt_eq(Value::Int64(0))
177 .and(col("x").lt(Value::Int64(10))),
178 col("x")
179 .gt_eq(Value::Int64(20))
180 .and(col("x").lt(Value::Int64(30))),
181 ];
182 let to = vec![
184 col("x")
185 .gt_eq(Value::Int64(5))
186 .and(col("x").lt(Value::Int64(15))),
187 col("x")
188 .gt_eq(Value::Int64(40))
189 .and(col("x").lt(Value::Int64(50))),
190 ];
191
192 let subtasks = create_subtasks(&from, &to).unwrap();
193
194 assert_eq!(subtasks.len(), 1);
197 assert_eq!(subtasks[0].from_expr_indices, vec![0]);
198 assert_eq!(subtasks[0].to_expr_indices, vec![0]);
199 assert_eq!(subtasks[0].transition_map, vec![vec![0]]);
200 }
201
202 #[test]
203 fn test_create_subtasks_multi() {
204 let from = vec![
206 col("u")
207 .gt_eq(Value::Int64(0))
208 .and(col("u").lt(Value::Int64(100))),
209 col("u")
210 .gt_eq(Value::Int64(100))
211 .and(col("u").lt(Value::Int64(200))),
212 ];
213 let to = vec![
215 col("u")
216 .gt_eq(Value::Int64(0))
217 .and(col("u").lt(Value::Int64(50))),
218 col("u")
219 .gt_eq(Value::Int64(50))
220 .and(col("u").lt(Value::Int64(150))),
221 col("u")
222 .gt_eq(Value::Int64(150))
223 .and(col("u").lt(Value::Int64(250))),
224 ];
225
226 let subtasks = create_subtasks(&from, &to).unwrap();
227 assert_eq!(subtasks.len(), 1);
229 assert_eq!(subtasks[0].from_expr_indices, vec![0, 1]);
230 assert_eq!(subtasks[0].to_expr_indices, vec![0, 1, 2]);
231 assert_eq!(subtasks[0].transition_map[0], vec![0, 1]);
234 assert_eq!(subtasks[0].transition_map[1], vec![1, 2]);
235 }
236
237 #[test]
238 fn test_two_components() {
239 let from = vec![
241 col("x")
242 .gt_eq(Value::Int64(0))
243 .and(col("x").lt(Value::Int64(10))),
244 col("x")
245 .gt_eq(Value::Int64(20))
246 .and(col("x").lt(Value::Int64(30))),
247 ];
248 let to = vec![
250 col("x")
251 .gt_eq(Value::Int64(5))
252 .and(col("x").lt(Value::Int64(7))),
253 col("x")
254 .gt_eq(Value::Int64(22))
255 .and(col("x").lt(Value::Int64(28))),
256 ];
257 let mut subtasks = create_subtasks(&from, &to).unwrap();
258 assert_eq!(subtasks.len(), 2);
260 subtasks.sort_by_key(|s| s.from_expr_indices[0]);
262 assert_eq!(subtasks[0].from_expr_indices, vec![0]);
263 assert_eq!(subtasks[0].to_expr_indices, vec![0]);
264 assert_eq!(subtasks[0].transition_map, vec![vec![0]]);
265 assert_eq!(subtasks[1].from_expr_indices, vec![1]);
266 assert_eq!(subtasks[1].to_expr_indices, vec![1]);
267 assert_eq!(subtasks[1].transition_map, vec![vec![1]]);
268 }
269
270 #[test]
271 fn test_bridge_single_component() {
272 let from = vec![
274 col("u")
275 .gt_eq(Value::Int64(0))
276 .and(col("u").lt(Value::Int64(10))),
277 col("u")
278 .gt_eq(Value::Int64(10))
279 .and(col("u").lt(Value::Int64(20))),
280 ];
281 let to = vec![
283 col("u")
284 .gt_eq(Value::Int64(5))
285 .and(col("u").lt(Value::Int64(15))),
286 col("u")
287 .gt_eq(Value::Int64(15))
288 .and(col("u").lt(Value::Int64(25))),
289 ];
290 let subtasks = create_subtasks(&from, &to).unwrap();
291 assert_eq!(subtasks.len(), 1);
292 assert_eq!(subtasks[0].from_expr_indices, vec![0, 1]);
293 assert_eq!(subtasks[0].to_expr_indices, vec![0, 1]);
294 assert_eq!(subtasks[0].transition_map[0], vec![0]);
295 assert_eq!(subtasks[0].transition_map[1], vec![0, 1]);
296 }
297
298 #[test]
299 fn test_all_isolated_no_subtasks() {
300 let from = vec![col("k").lt(Value::Int64(10))];
302 let to = vec![col("k").gt_eq(Value::Int64(10))];
303 let subtasks = create_subtasks(&from, &to).unwrap();
304 assert!(subtasks.is_empty());
305 }
306}