Skip to main content

meta_srv/selector/
common.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use common_meta::peer::Peer;
18use snafu::ensure;
19
20use crate::error;
21use crate::error::Result;
22use crate::metasrv::SelectTarget;
23use crate::selector::SelectorOptions;
24use crate::selector::weighted_choose::{WeightedChoose, WeightedItem};
25
26/// Filter out the excluded peers from the `weight_array`.
27pub fn filter_out_excluded_peers(
28    weight_array: &mut Vec<WeightedItem<Peer>>,
29    exclude_peer_ids: &HashSet<u64>,
30) {
31    weight_array.retain(|peer| !exclude_peer_ids.contains(&peer.item.id));
32}
33
34/// According to the `opts`, choose peers from the `weight_array` through `weighted_choose`.
35pub fn choose_items<W>(opts: &SelectorOptions, weighted_choose: &mut W) -> Result<Vec<Peer>>
36where
37    W: WeightedChoose<Peer>,
38{
39    let min_required_items = opts.min_required_items;
40    ensure!(
41        !weighted_choose.is_empty(),
42        error::NoEnoughAvailableNodeSnafu {
43            required: min_required_items,
44            available: 0_usize,
45            select_target: SelectTarget::Datanode
46        }
47    );
48
49    if min_required_items == 1 {
50        // fast path
51        return Ok(vec![weighted_choose.choose_one()?]);
52    }
53
54    let available_count = weighted_choose.len();
55
56    if opts.allow_duplication {
57        // Calculate how many complete rounds of `available_count` items to select,
58        // plus any additional items needed after complete rounds.
59        let complete_batches = min_required_items / available_count;
60        let leftover_items = min_required_items % available_count;
61        if complete_batches == 0 {
62            return weighted_choose.choose_multiple(leftover_items);
63        }
64
65        let mut result = Vec::with_capacity(min_required_items);
66        for _ in 0..complete_batches {
67            result.extend(weighted_choose.choose_multiple(available_count)?);
68        }
69        result.extend(weighted_choose.choose_multiple(leftover_items)?);
70
71        Ok(result)
72    } else {
73        // Ensure the available items are sufficient when duplication is not allowed.
74        ensure!(
75            available_count >= min_required_items,
76            error::NoEnoughAvailableNodeSnafu {
77                required: min_required_items,
78                available: available_count,
79                select_target: SelectTarget::Datanode
80            }
81        );
82
83        weighted_choose.choose_multiple(min_required_items)
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use std::collections::{HashMap, HashSet};
90
91    use common_meta::peer::Peer;
92
93    use crate::selector::SelectorOptions;
94    use crate::selector::common::{choose_items, filter_out_excluded_peers};
95    use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
96
97    #[test]
98    fn test_choose_peers() {
99        let weight_array = vec![
100            WeightedItem {
101                item: Peer {
102                    id: 1,
103                    addr: "127.0.0.1:3001".to_string(),
104                },
105                weight: 1.0,
106            },
107            WeightedItem {
108                item: Peer {
109                    id: 2,
110                    addr: "127.0.0.1:3001".to_string(),
111                },
112                weight: 1.0,
113            },
114            WeightedItem {
115                item: Peer {
116                    id: 3,
117                    addr: "127.0.0.1:3001".to_string(),
118                },
119                weight: 1.0,
120            },
121            WeightedItem {
122                item: Peer {
123                    id: 4,
124                    addr: "127.0.0.1:3001".to_string(),
125                },
126                weight: 1.0,
127            },
128            WeightedItem {
129                item: Peer {
130                    id: 5,
131                    addr: "127.0.0.1:3001".to_string(),
132                },
133                weight: 1.0,
134            },
135        ];
136
137        for i in 1..=5 {
138            let opts = SelectorOptions {
139                min_required_items: i,
140                allow_duplication: false,
141                exclude_peer_ids: HashSet::new(),
142                workload_filter: None,
143                extensions: HashMap::new(),
144            };
145
146            let selected_peers: HashSet<_> =
147                choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone()))
148                    .unwrap()
149                    .into_iter()
150                    .collect();
151
152            assert_eq!(i, selected_peers.len());
153        }
154
155        let opts = SelectorOptions {
156            min_required_items: 6,
157            allow_duplication: false,
158            exclude_peer_ids: HashSet::new(),
159            workload_filter: None,
160            extensions: HashMap::new(),
161        };
162
163        let selected_result =
164            choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone()));
165        assert!(selected_result.is_err());
166
167        for i in 1..=50 {
168            let opts = SelectorOptions {
169                min_required_items: i,
170                allow_duplication: true,
171                exclude_peer_ids: HashSet::new(),
172                workload_filter: None,
173                extensions: HashMap::new(),
174            };
175
176            let selected_peers =
177                choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone())).unwrap();
178
179            assert_eq!(i, selected_peers.len());
180        }
181    }
182
183    #[test]
184    fn test_filter_out_excluded_peers() {
185        let mut weight_array = vec![
186            WeightedItem {
187                item: Peer {
188                    id: 1,
189                    addr: "127.0.0.1:3001".to_string(),
190                },
191                weight: 1.0,
192            },
193            WeightedItem {
194                item: Peer {
195                    id: 2,
196                    addr: "127.0.0.1:3002".to_string(),
197                },
198                weight: 1.0,
199            },
200        ];
201
202        let exclude_peer_ids = HashSet::from([1]);
203        filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids);
204
205        assert_eq!(weight_array.len(), 1);
206        assert_eq!(weight_array[0].item.id, 2);
207    }
208}