meta_srv/selector/
common.rs1use std::collections::HashSet;
16
17use common_meta::peer::Peer;
18use snafu::ensure;
19
20use crate::error;
21use crate::error::Result;
22use crate::metasrv::SelectTarget;
23use crate::selector::SelectorOptions;
24use crate::selector::weighted_choose::{WeightedChoose, WeightedItem};
25
26pub fn filter_out_excluded_peers(
28 weight_array: &mut Vec<WeightedItem<Peer>>,
29 exclude_peer_ids: &HashSet<u64>,
30) {
31 weight_array.retain(|peer| !exclude_peer_ids.contains(&peer.item.id));
32}
33
34pub fn choose_items<W>(opts: &SelectorOptions, weighted_choose: &mut W) -> Result<Vec<Peer>>
36where
37 W: WeightedChoose<Peer>,
38{
39 let min_required_items = opts.min_required_items;
40 ensure!(
41 !weighted_choose.is_empty(),
42 error::NoEnoughAvailableNodeSnafu {
43 required: min_required_items,
44 available: 0_usize,
45 select_target: SelectTarget::Datanode
46 }
47 );
48
49 if min_required_items == 1 {
50 return Ok(vec![weighted_choose.choose_one()?]);
52 }
53
54 let available_count = weighted_choose.len();
55
56 if opts.allow_duplication {
57 let complete_batches = min_required_items / available_count;
60 let leftover_items = min_required_items % available_count;
61 if complete_batches == 0 {
62 return weighted_choose.choose_multiple(leftover_items);
63 }
64
65 let mut result = Vec::with_capacity(min_required_items);
66 for _ in 0..complete_batches {
67 result.extend(weighted_choose.choose_multiple(available_count)?);
68 }
69 result.extend(weighted_choose.choose_multiple(leftover_items)?);
70
71 Ok(result)
72 } else {
73 ensure!(
75 available_count >= min_required_items,
76 error::NoEnoughAvailableNodeSnafu {
77 required: min_required_items,
78 available: available_count,
79 select_target: SelectTarget::Datanode
80 }
81 );
82
83 weighted_choose.choose_multiple(min_required_items)
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use std::collections::{HashMap, HashSet};
90
91 use common_meta::peer::Peer;
92
93 use crate::selector::SelectorOptions;
94 use crate::selector::common::{choose_items, filter_out_excluded_peers};
95 use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
96
97 #[test]
98 fn test_choose_peers() {
99 let weight_array = vec![
100 WeightedItem {
101 item: Peer {
102 id: 1,
103 addr: "127.0.0.1:3001".to_string(),
104 },
105 weight: 1.0,
106 },
107 WeightedItem {
108 item: Peer {
109 id: 2,
110 addr: "127.0.0.1:3001".to_string(),
111 },
112 weight: 1.0,
113 },
114 WeightedItem {
115 item: Peer {
116 id: 3,
117 addr: "127.0.0.1:3001".to_string(),
118 },
119 weight: 1.0,
120 },
121 WeightedItem {
122 item: Peer {
123 id: 4,
124 addr: "127.0.0.1:3001".to_string(),
125 },
126 weight: 1.0,
127 },
128 WeightedItem {
129 item: Peer {
130 id: 5,
131 addr: "127.0.0.1:3001".to_string(),
132 },
133 weight: 1.0,
134 },
135 ];
136
137 for i in 1..=5 {
138 let opts = SelectorOptions {
139 min_required_items: i,
140 allow_duplication: false,
141 exclude_peer_ids: HashSet::new(),
142 workload_filter: None,
143 extensions: HashMap::new(),
144 };
145
146 let selected_peers: HashSet<_> =
147 choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone()))
148 .unwrap()
149 .into_iter()
150 .collect();
151
152 assert_eq!(i, selected_peers.len());
153 }
154
155 let opts = SelectorOptions {
156 min_required_items: 6,
157 allow_duplication: false,
158 exclude_peer_ids: HashSet::new(),
159 workload_filter: None,
160 extensions: HashMap::new(),
161 };
162
163 let selected_result =
164 choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone()));
165 assert!(selected_result.is_err());
166
167 for i in 1..=50 {
168 let opts = SelectorOptions {
169 min_required_items: i,
170 allow_duplication: true,
171 exclude_peer_ids: HashSet::new(),
172 workload_filter: None,
173 extensions: HashMap::new(),
174 };
175
176 let selected_peers =
177 choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone())).unwrap();
178
179 assert_eq!(i, selected_peers.len());
180 }
181 }
182
183 #[test]
184 fn test_filter_out_excluded_peers() {
185 let mut weight_array = vec![
186 WeightedItem {
187 item: Peer {
188 id: 1,
189 addr: "127.0.0.1:3001".to_string(),
190 },
191 weight: 1.0,
192 },
193 WeightedItem {
194 item: Peer {
195 id: 2,
196 addr: "127.0.0.1:3002".to_string(),
197 },
198 weight: 1.0,
199 },
200 ];
201
202 let exclude_peer_ids = HashSet::from([1]);
203 filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids);
204
205 assert_eq!(weight_array.len(), 1);
206 assert_eq!(weight_array[0].item.id, 2);
207 }
208}