meta_srv/selector/
common.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use common_meta::peer::Peer;
18use snafu::ensure;
19
20use crate::error;
21use crate::error::Result;
22use crate::metasrv::SelectTarget;
23use crate::selector::weighted_choose::{WeightedChoose, WeightedItem};
24use crate::selector::SelectorOptions;
25
26/// Filter out the excluded peers from the `weight_array`.
27pub fn filter_out_excluded_peers(
28    weight_array: &mut Vec<WeightedItem<Peer>>,
29    exclude_peer_ids: &HashSet<u64>,
30) {
31    weight_array.retain(|peer| !exclude_peer_ids.contains(&peer.item.id));
32}
33
34/// According to the `opts`, choose peers from the `weight_array` through `weighted_choose`.
35pub fn choose_items<W>(opts: &SelectorOptions, weighted_choose: &mut W) -> Result<Vec<Peer>>
36where
37    W: WeightedChoose<Peer>,
38{
39    let min_required_items = opts.min_required_items;
40    ensure!(
41        !weighted_choose.is_empty(),
42        error::NoEnoughAvailableNodeSnafu {
43            required: min_required_items,
44            available: 0_usize,
45            select_target: SelectTarget::Datanode
46        }
47    );
48
49    if min_required_items == 1 {
50        // fast path
51        return Ok(vec![weighted_choose.choose_one()?]);
52    }
53
54    let available_count = weighted_choose.len();
55
56    if opts.allow_duplication {
57        // Calculate how many complete rounds of `available_count` items to select,
58        // plus any additional items needed after complete rounds.
59        let complete_batches = min_required_items / available_count;
60        let leftover_items = min_required_items % available_count;
61        if complete_batches == 0 {
62            return weighted_choose.choose_multiple(leftover_items);
63        }
64
65        let mut result = Vec::with_capacity(min_required_items);
66        for _ in 0..complete_batches {
67            result.extend(weighted_choose.choose_multiple(available_count)?);
68        }
69        result.extend(weighted_choose.choose_multiple(leftover_items)?);
70
71        Ok(result)
72    } else {
73        // Ensure the available items are sufficient when duplication is not allowed.
74        ensure!(
75            available_count >= min_required_items,
76            error::NoEnoughAvailableNodeSnafu {
77                required: min_required_items,
78                available: available_count,
79                select_target: SelectTarget::Datanode
80            }
81        );
82
83        weighted_choose.choose_multiple(min_required_items)
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use std::collections::HashSet;
90
91    use common_meta::peer::Peer;
92
93    use crate::selector::common::{choose_items, filter_out_excluded_peers};
94    use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
95    use crate::selector::SelectorOptions;
96
97    #[test]
98    fn test_choose_peers() {
99        let weight_array = vec![
100            WeightedItem {
101                item: Peer {
102                    id: 1,
103                    addr: "127.0.0.1:3001".to_string(),
104                },
105                weight: 1.0,
106            },
107            WeightedItem {
108                item: Peer {
109                    id: 2,
110                    addr: "127.0.0.1:3001".to_string(),
111                },
112                weight: 1.0,
113            },
114            WeightedItem {
115                item: Peer {
116                    id: 3,
117                    addr: "127.0.0.1:3001".to_string(),
118                },
119                weight: 1.0,
120            },
121            WeightedItem {
122                item: Peer {
123                    id: 4,
124                    addr: "127.0.0.1:3001".to_string(),
125                },
126                weight: 1.0,
127            },
128            WeightedItem {
129                item: Peer {
130                    id: 5,
131                    addr: "127.0.0.1:3001".to_string(),
132                },
133                weight: 1.0,
134            },
135        ];
136
137        for i in 1..=5 {
138            let opts = SelectorOptions {
139                min_required_items: i,
140                allow_duplication: false,
141                exclude_peer_ids: HashSet::new(),
142            };
143
144            let selected_peers: HashSet<_> =
145                choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone()))
146                    .unwrap()
147                    .into_iter()
148                    .collect();
149
150            assert_eq!(i, selected_peers.len());
151        }
152
153        let opts = SelectorOptions {
154            min_required_items: 6,
155            allow_duplication: false,
156            exclude_peer_ids: HashSet::new(),
157        };
158
159        let selected_result =
160            choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone()));
161        assert!(selected_result.is_err());
162
163        for i in 1..=50 {
164            let opts = SelectorOptions {
165                min_required_items: i,
166                allow_duplication: true,
167                exclude_peer_ids: HashSet::new(),
168            };
169
170            let selected_peers =
171                choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone())).unwrap();
172
173            assert_eq!(i, selected_peers.len());
174        }
175    }
176
177    #[test]
178    fn test_filter_out_excluded_peers() {
179        let mut weight_array = vec![
180            WeightedItem {
181                item: Peer {
182                    id: 1,
183                    addr: "127.0.0.1:3001".to_string(),
184                },
185                weight: 1.0,
186            },
187            WeightedItem {
188                item: Peer {
189                    id: 2,
190                    addr: "127.0.0.1:3002".to_string(),
191                },
192                weight: 1.0,
193            },
194        ];
195
196        let exclude_peer_ids = HashSet::from([1]);
197        filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids);
198
199        assert_eq!(weight_array.len(), 1);
200        assert_eq!(weight_array[0].item.id, 2);
201    }
202}