partition/
subtask.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16
17use crate::error::Result;
18use crate::expr::PartitionExpr;
19use crate::overlap::associate_from_to;
20
21/// Indices are into the original input arrays (array of [`PartitionExpr`]). A connected component.
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct RepartitionSubtask {
24    pub from_expr_indices: Vec<usize>,
25    pub to_expr_indices: Vec<usize>,
26    /// For each `from_expr_indices[k]`, the corresponding vector contains global
27    /// `to_expr_indices` that overlap with it (indices into the original `to_exprs`).
28    pub transition_map: Vec<Vec<usize>>,
29}
30
31/// Create independent subtasks out of given FROM/TO partition expressions.
32pub fn create_subtasks(
33    from_exprs: &[PartitionExpr],
34    to_exprs: &[PartitionExpr],
35) -> Result<Vec<RepartitionSubtask>> {
36    // FROM -> TO
37    let assoc = associate_from_to(from_exprs, to_exprs)?;
38    if !assoc.iter().any(|v| !v.is_empty()) {
39        return Ok(vec![]);
40    }
41
42    // TO -> FROM
43    let mut rev = vec![Vec::new(); to_exprs.len()];
44    for (li, rights) in assoc.iter().enumerate() {
45        for &r in rights {
46            rev[r].push(li);
47        }
48    }
49
50    // FROM(left), TO(right). Undirected
51    let mut visited_left = vec![false; from_exprs.len()];
52    let mut visited_right = vec![false; to_exprs.len()];
53    let mut subtasks = Vec::new();
54
55    for li in 0..from_exprs.len() {
56        if assoc[li].is_empty() || visited_left[li] {
57            continue;
58        }
59
60        #[derive(Copy, Clone)]
61        enum Node {
62            Left(usize),
63            Right(usize),
64        }
65        let mut left_set = Vec::new();
66        let mut right_set = Vec::new();
67        let mut queue = VecDeque::new();
68
69        visited_left[li] = true;
70        queue.push_back(Node::Left(li));
71
72        while let Some(node) = queue.pop_front() {
73            match node {
74                Node::Left(left) => {
75                    left_set.push(left);
76                    for &r in &assoc[left] {
77                        if !visited_right[r] {
78                            visited_right[r] = true;
79                            queue.push_back(Node::Right(r));
80                        }
81                    }
82                }
83                Node::Right(right) => {
84                    right_set.push(right);
85                    for &l in &rev[right] {
86                        if !visited_left[l] {
87                            visited_left[l] = true;
88                            queue.push_back(Node::Left(l));
89                        }
90                    }
91                }
92            }
93        }
94
95        left_set.sort_unstable();
96        right_set.sort_unstable();
97
98        let transition_map = left_set
99            .iter()
100            .map(|&i| assoc[i].clone())
101            .collect::<Vec<_>>();
102
103        subtasks.push(RepartitionSubtask {
104            from_expr_indices: left_set,
105            to_expr_indices: right_set,
106            transition_map,
107        });
108    }
109
110    Ok(subtasks)
111}
112
113#[cfg(test)]
114mod tests {
115    use datatypes::value::Value;
116
117    use super::*;
118    use crate::expr::col;
119    #[test]
120    fn test_split_one_to_two() {
121        // Left: [0, 40)
122        let from = vec![
123            col("u")
124                .gt_eq(Value::Int64(0))
125                .and(col("u").lt(Value::Int64(20))),
126        ];
127
128        // Right: [0, 10), [10, 20)
129        let to = vec![
130            col("u")
131                .gt_eq(Value::Int64(0))
132                .and(col("u").lt(Value::Int64(10))),
133            col("u")
134                .gt_eq(Value::Int64(10))
135                .and(col("u").lt(Value::Int64(20))),
136        ];
137
138        let subtasks = create_subtasks(&from, &to).unwrap();
139        assert_eq!(subtasks.len(), 1);
140        assert_eq!(subtasks[0].from_expr_indices, vec![0]);
141        assert_eq!(subtasks[0].to_expr_indices, vec![0, 1]);
142        assert_eq!(subtasks[0].transition_map[0], vec![0, 1]);
143    }
144
145    #[test]
146    fn test_merge_two_to_one() {
147        // Left: [0, 10), [10, 20)
148        let from = vec![
149            col("u")
150                .gt_eq(Value::Int64(0))
151                .and(col("u").lt(Value::Int64(10))),
152            col("u")
153                .gt_eq(Value::Int64(10))
154                .and(col("u").lt(Value::Int64(20))),
155        ];
156        // Right: [0, 40)
157        let to = vec![
158            col("u")
159                .gt_eq(Value::Int64(0))
160                .and(col("u").lt(Value::Int64(20))),
161        ];
162
163        let subtasks = create_subtasks(&from, &to).unwrap();
164        assert_eq!(subtasks.len(), 1);
165        assert_eq!(subtasks[0].from_expr_indices, vec![0, 1]);
166        assert_eq!(subtasks[0].to_expr_indices, vec![0]);
167        assert_eq!(subtasks[0].transition_map[0], vec![0]);
168        assert_eq!(subtasks[0].transition_map[1], vec![0]);
169    }
170
171    #[test]
172    fn test_create_subtasks_disconnected() {
173        // Left: A:[0,10), B:[20,30)
174        let from = vec![
175            col("x")
176                .gt_eq(Value::Int64(0))
177                .and(col("x").lt(Value::Int64(10))),
178            col("x")
179                .gt_eq(Value::Int64(20))
180                .and(col("x").lt(Value::Int64(30))),
181        ];
182        // Right: C:[5,15), D:[40,50)
183        let to = vec![
184            col("x")
185                .gt_eq(Value::Int64(5))
186                .and(col("x").lt(Value::Int64(15))),
187            col("x")
188                .gt_eq(Value::Int64(40))
189                .and(col("x").lt(Value::Int64(50))),
190        ];
191
192        let subtasks = create_subtasks(&from, &to).unwrap();
193
194        // Expect two components: {A,C} and {B} has no edges so filtered out
195        // Note: nodes with no edges are excluded by construction
196        assert_eq!(subtasks.len(), 1);
197        assert_eq!(subtasks[0].from_expr_indices, vec![0]);
198        assert_eq!(subtasks[0].to_expr_indices, vec![0]);
199        assert_eq!(subtasks[0].transition_map, vec![vec![0]]);
200    }
201
202    #[test]
203    fn test_create_subtasks_multi() {
204        // Left: [0,100), [100,200)
205        let from = vec![
206            col("u")
207                .gt_eq(Value::Int64(0))
208                .and(col("u").lt(Value::Int64(100))),
209            col("u")
210                .gt_eq(Value::Int64(100))
211                .and(col("u").lt(Value::Int64(200))),
212        ];
213        // Right: [0,50), [50,150), [150,250)
214        let to = vec![
215            col("u")
216                .gt_eq(Value::Int64(0))
217                .and(col("u").lt(Value::Int64(50))),
218            col("u")
219                .gt_eq(Value::Int64(50))
220                .and(col("u").lt(Value::Int64(150))),
221            col("u")
222                .gt_eq(Value::Int64(150))
223                .and(col("u").lt(Value::Int64(250))),
224        ];
225
226        let subtasks = create_subtasks(&from, &to).unwrap();
227        // All connected into a single component
228        assert_eq!(subtasks.len(), 1);
229        assert_eq!(subtasks[0].from_expr_indices, vec![0, 1]);
230        assert_eq!(subtasks[0].to_expr_indices, vec![0, 1, 2]);
231        // [0,100) -> [0,50), [50,150)
232        // [100,200) -> [50,150), [150,250)
233        assert_eq!(subtasks[0].transition_map[0], vec![0, 1]);
234        assert_eq!(subtasks[0].transition_map[1], vec![1, 2]);
235    }
236
237    #[test]
238    fn test_two_components() {
239        // Left: A:[0,10), B:[20,30)
240        let from = vec![
241            col("x")
242                .gt_eq(Value::Int64(0))
243                .and(col("x").lt(Value::Int64(10))),
244            col("x")
245                .gt_eq(Value::Int64(20))
246                .and(col("x").lt(Value::Int64(30))),
247        ];
248        // Right: C:[5,7), D:[22,28)
249        let to = vec![
250            col("x")
251                .gt_eq(Value::Int64(5))
252                .and(col("x").lt(Value::Int64(7))),
253            col("x")
254                .gt_eq(Value::Int64(22))
255                .and(col("x").lt(Value::Int64(28))),
256        ];
257        let mut subtasks = create_subtasks(&from, &to).unwrap();
258        // Deterministic order: left indices sorted, so components may appear in order of discovery.
259        assert_eq!(subtasks.len(), 2);
260        // Sort for stable assertion by smallest left index
261        subtasks.sort_by_key(|s| s.from_expr_indices[0]);
262        assert_eq!(subtasks[0].from_expr_indices, vec![0]);
263        assert_eq!(subtasks[0].to_expr_indices, vec![0]);
264        assert_eq!(subtasks[0].transition_map, vec![vec![0]]);
265        assert_eq!(subtasks[1].from_expr_indices, vec![1]);
266        assert_eq!(subtasks[1].to_expr_indices, vec![1]);
267        assert_eq!(subtasks[1].transition_map, vec![vec![1]]);
268    }
269
270    #[test]
271    fn test_bridge_single_component() {
272        // Left: [0,10), [10,20)
273        let from = vec![
274            col("u")
275                .gt_eq(Value::Int64(0))
276                .and(col("u").lt(Value::Int64(10))),
277            col("u")
278                .gt_eq(Value::Int64(10))
279                .and(col("u").lt(Value::Int64(20))),
280        ];
281        // Right: [5,15), [15,25)
282        let to = vec![
283            col("u")
284                .gt_eq(Value::Int64(5))
285                .and(col("u").lt(Value::Int64(15))),
286            col("u")
287                .gt_eq(Value::Int64(15))
288                .and(col("u").lt(Value::Int64(25))),
289        ];
290        let subtasks = create_subtasks(&from, &to).unwrap();
291        assert_eq!(subtasks.len(), 1);
292        assert_eq!(subtasks[0].from_expr_indices, vec![0, 1]);
293        assert_eq!(subtasks[0].to_expr_indices, vec![0, 1]);
294        assert_eq!(subtasks[0].transition_map[0], vec![0]);
295        assert_eq!(subtasks[0].transition_map[1], vec![0, 1]);
296    }
297
298    #[test]
299    fn test_all_isolated_no_subtasks() {
300        // No edges at all
301        let from = vec![col("k").lt(Value::Int64(10))];
302        let to = vec![col("k").gt_eq(Value::Int64(10))];
303        let subtasks = create_subtasks(&from, &to).unwrap();
304        assert!(subtasks.is_empty());
305    }
306}