meta_srv/selector/
common.rs1use std::collections::HashSet;
16
17use common_meta::peer::Peer;
18use snafu::ensure;
19
20use crate::error;
21use crate::error::Result;
22use crate::metasrv::SelectTarget;
23use crate::selector::weighted_choose::{WeightedChoose, WeightedItem};
24use crate::selector::SelectorOptions;
25
26pub fn filter_out_excluded_peers(
28 weight_array: &mut Vec<WeightedItem<Peer>>,
29 exclude_peer_ids: &HashSet<u64>,
30) {
31 weight_array.retain(|peer| !exclude_peer_ids.contains(&peer.item.id));
32}
33
34pub fn choose_items<W>(opts: &SelectorOptions, weighted_choose: &mut W) -> Result<Vec<Peer>>
36where
37 W: WeightedChoose<Peer>,
38{
39 let min_required_items = opts.min_required_items;
40 ensure!(
41 !weighted_choose.is_empty(),
42 error::NoEnoughAvailableNodeSnafu {
43 required: min_required_items,
44 available: 0_usize,
45 select_target: SelectTarget::Datanode
46 }
47 );
48
49 if min_required_items == 1 {
50 return Ok(vec![weighted_choose.choose_one()?]);
52 }
53
54 let available_count = weighted_choose.len();
55
56 if opts.allow_duplication {
57 let complete_batches = min_required_items / available_count;
60 let leftover_items = min_required_items % available_count;
61 if complete_batches == 0 {
62 return weighted_choose.choose_multiple(leftover_items);
63 }
64
65 let mut result = Vec::with_capacity(min_required_items);
66 for _ in 0..complete_batches {
67 result.extend(weighted_choose.choose_multiple(available_count)?);
68 }
69 result.extend(weighted_choose.choose_multiple(leftover_items)?);
70
71 Ok(result)
72 } else {
73 ensure!(
75 available_count >= min_required_items,
76 error::NoEnoughAvailableNodeSnafu {
77 required: min_required_items,
78 available: available_count,
79 select_target: SelectTarget::Datanode
80 }
81 );
82
83 weighted_choose.choose_multiple(min_required_items)
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use std::collections::HashSet;
90
91 use common_meta::peer::Peer;
92
93 use crate::selector::common::{choose_items, filter_out_excluded_peers};
94 use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
95 use crate::selector::SelectorOptions;
96
97 #[test]
98 fn test_choose_peers() {
99 let weight_array = vec![
100 WeightedItem {
101 item: Peer {
102 id: 1,
103 addr: "127.0.0.1:3001".to_string(),
104 },
105 weight: 1.0,
106 },
107 WeightedItem {
108 item: Peer {
109 id: 2,
110 addr: "127.0.0.1:3001".to_string(),
111 },
112 weight: 1.0,
113 },
114 WeightedItem {
115 item: Peer {
116 id: 3,
117 addr: "127.0.0.1:3001".to_string(),
118 },
119 weight: 1.0,
120 },
121 WeightedItem {
122 item: Peer {
123 id: 4,
124 addr: "127.0.0.1:3001".to_string(),
125 },
126 weight: 1.0,
127 },
128 WeightedItem {
129 item: Peer {
130 id: 5,
131 addr: "127.0.0.1:3001".to_string(),
132 },
133 weight: 1.0,
134 },
135 ];
136
137 for i in 1..=5 {
138 let opts = SelectorOptions {
139 min_required_items: i,
140 allow_duplication: false,
141 exclude_peer_ids: HashSet::new(),
142 };
143
144 let selected_peers: HashSet<_> =
145 choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone()))
146 .unwrap()
147 .into_iter()
148 .collect();
149
150 assert_eq!(i, selected_peers.len());
151 }
152
153 let opts = SelectorOptions {
154 min_required_items: 6,
155 allow_duplication: false,
156 exclude_peer_ids: HashSet::new(),
157 };
158
159 let selected_result =
160 choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone()));
161 assert!(selected_result.is_err());
162
163 for i in 1..=50 {
164 let opts = SelectorOptions {
165 min_required_items: i,
166 allow_duplication: true,
167 exclude_peer_ids: HashSet::new(),
168 };
169
170 let selected_peers =
171 choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone())).unwrap();
172
173 assert_eq!(i, selected_peers.len());
174 }
175 }
176
177 #[test]
178 fn test_filter_out_excluded_peers() {
179 let mut weight_array = vec![
180 WeightedItem {
181 item: Peer {
182 id: 1,
183 addr: "127.0.0.1:3001".to_string(),
184 },
185 weight: 1.0,
186 },
187 WeightedItem {
188 item: Peer {
189 id: 2,
190 addr: "127.0.0.1:3002".to_string(),
191 },
192 weight: 1.0,
193 },
194 ];
195
196 let exclude_peer_ids = HashSet::from([1]);
197 filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids);
198
199 assert_eq!(weight_array.len(), 1);
200 assert_eq!(weight_array[0].item.id, 2);
201 }
202}