meta_srv/selector/
common.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use common_meta::peer::Peer;
18use snafu::ensure;
19
20use crate::error;
21use crate::error::Result;
22use crate::metasrv::SelectTarget;
23use crate::selector::SelectorOptions;
24use crate::selector::weighted_choose::{WeightedChoose, WeightedItem};
25
26/// Filter out the excluded peers from the `weight_array`.
27pub fn filter_out_excluded_peers(
28    weight_array: &mut Vec<WeightedItem<Peer>>,
29    exclude_peer_ids: &HashSet<u64>,
30) {
31    weight_array.retain(|peer| !exclude_peer_ids.contains(&peer.item.id));
32}
33
34/// According to the `opts`, choose peers from the `weight_array` through `weighted_choose`.
35pub fn choose_items<W>(opts: &SelectorOptions, weighted_choose: &mut W) -> Result<Vec<Peer>>
36where
37    W: WeightedChoose<Peer>,
38{
39    let min_required_items = opts.min_required_items;
40    ensure!(
41        !weighted_choose.is_empty(),
42        error::NoEnoughAvailableNodeSnafu {
43            required: min_required_items,
44            available: 0_usize,
45            select_target: SelectTarget::Datanode
46        }
47    );
48
49    if min_required_items == 1 {
50        // fast path
51        return Ok(vec![weighted_choose.choose_one()?]);
52    }
53
54    let available_count = weighted_choose.len();
55
56    if opts.allow_duplication {
57        // Calculate how many complete rounds of `available_count` items to select,
58        // plus any additional items needed after complete rounds.
59        let complete_batches = min_required_items / available_count;
60        let leftover_items = min_required_items % available_count;
61        if complete_batches == 0 {
62            return weighted_choose.choose_multiple(leftover_items);
63        }
64
65        let mut result = Vec::with_capacity(min_required_items);
66        for _ in 0..complete_batches {
67            result.extend(weighted_choose.choose_multiple(available_count)?);
68        }
69        result.extend(weighted_choose.choose_multiple(leftover_items)?);
70
71        Ok(result)
72    } else {
73        // Ensure the available items are sufficient when duplication is not allowed.
74        ensure!(
75            available_count >= min_required_items,
76            error::NoEnoughAvailableNodeSnafu {
77                required: min_required_items,
78                available: available_count,
79                select_target: SelectTarget::Datanode
80            }
81        );
82
83        weighted_choose.choose_multiple(min_required_items)
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use std::collections::HashSet;
90
91    use common_meta::peer::Peer;
92
93    use crate::selector::SelectorOptions;
94    use crate::selector::common::{choose_items, filter_out_excluded_peers};
95    use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
96
97    #[test]
98    fn test_choose_peers() {
99        let weight_array = vec![
100            WeightedItem {
101                item: Peer {
102                    id: 1,
103                    addr: "127.0.0.1:3001".to_string(),
104                },
105                weight: 1.0,
106            },
107            WeightedItem {
108                item: Peer {
109                    id: 2,
110                    addr: "127.0.0.1:3001".to_string(),
111                },
112                weight: 1.0,
113            },
114            WeightedItem {
115                item: Peer {
116                    id: 3,
117                    addr: "127.0.0.1:3001".to_string(),
118                },
119                weight: 1.0,
120            },
121            WeightedItem {
122                item: Peer {
123                    id: 4,
124                    addr: "127.0.0.1:3001".to_string(),
125                },
126                weight: 1.0,
127            },
128            WeightedItem {
129                item: Peer {
130                    id: 5,
131                    addr: "127.0.0.1:3001".to_string(),
132                },
133                weight: 1.0,
134            },
135        ];
136
137        for i in 1..=5 {
138            let opts = SelectorOptions {
139                min_required_items: i,
140                allow_duplication: false,
141                exclude_peer_ids: HashSet::new(),
142                workload_filter: None,
143            };
144
145            let selected_peers: HashSet<_> =
146                choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone()))
147                    .unwrap()
148                    .into_iter()
149                    .collect();
150
151            assert_eq!(i, selected_peers.len());
152        }
153
154        let opts = SelectorOptions {
155            min_required_items: 6,
156            allow_duplication: false,
157            exclude_peer_ids: HashSet::new(),
158            workload_filter: None,
159        };
160
161        let selected_result =
162            choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone()));
163        assert!(selected_result.is_err());
164
165        for i in 1..=50 {
166            let opts = SelectorOptions {
167                min_required_items: i,
168                allow_duplication: true,
169                exclude_peer_ids: HashSet::new(),
170                workload_filter: None,
171            };
172
173            let selected_peers =
174                choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone())).unwrap();
175
176            assert_eq!(i, selected_peers.len());
177        }
178    }
179
180    #[test]
181    fn test_filter_out_excluded_peers() {
182        let mut weight_array = vec![
183            WeightedItem {
184                item: Peer {
185                    id: 1,
186                    addr: "127.0.0.1:3001".to_string(),
187                },
188                weight: 1.0,
189            },
190            WeightedItem {
191                item: Peer {
192                    id: 2,
193                    addr: "127.0.0.1:3002".to_string(),
194                },
195                weight: 1.0,
196            },
197        ];
198
199        let exclude_peer_ids = HashSet::from([1]);
200        filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids);
201
202        assert_eq!(weight_array.len(), 1);
203        assert_eq!(weight_array[0].item.id, 2);
204    }
205}