meta_srv/selector/
common.rs1use std::collections::HashSet;
16
17use common_meta::peer::Peer;
18use snafu::ensure;
19
20use crate::error;
21use crate::error::Result;
22use crate::metasrv::SelectTarget;
23use crate::selector::SelectorOptions;
24use crate::selector::weighted_choose::{WeightedChoose, WeightedItem};
25
26pub fn filter_out_excluded_peers(
28 weight_array: &mut Vec<WeightedItem<Peer>>,
29 exclude_peer_ids: &HashSet<u64>,
30) {
31 weight_array.retain(|peer| !exclude_peer_ids.contains(&peer.item.id));
32}
33
34pub fn choose_items<W>(opts: &SelectorOptions, weighted_choose: &mut W) -> Result<Vec<Peer>>
36where
37 W: WeightedChoose<Peer>,
38{
39 let min_required_items = opts.min_required_items;
40 ensure!(
41 !weighted_choose.is_empty(),
42 error::NoEnoughAvailableNodeSnafu {
43 required: min_required_items,
44 available: 0_usize,
45 select_target: SelectTarget::Datanode
46 }
47 );
48
49 if min_required_items == 1 {
50 return Ok(vec![weighted_choose.choose_one()?]);
52 }
53
54 let available_count = weighted_choose.len();
55
56 if opts.allow_duplication {
57 let complete_batches = min_required_items / available_count;
60 let leftover_items = min_required_items % available_count;
61 if complete_batches == 0 {
62 return weighted_choose.choose_multiple(leftover_items);
63 }
64
65 let mut result = Vec::with_capacity(min_required_items);
66 for _ in 0..complete_batches {
67 result.extend(weighted_choose.choose_multiple(available_count)?);
68 }
69 result.extend(weighted_choose.choose_multiple(leftover_items)?);
70
71 Ok(result)
72 } else {
73 ensure!(
75 available_count >= min_required_items,
76 error::NoEnoughAvailableNodeSnafu {
77 required: min_required_items,
78 available: available_count,
79 select_target: SelectTarget::Datanode
80 }
81 );
82
83 weighted_choose.choose_multiple(min_required_items)
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use std::collections::HashSet;
90
91 use common_meta::peer::Peer;
92
93 use crate::selector::SelectorOptions;
94 use crate::selector::common::{choose_items, filter_out_excluded_peers};
95 use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
96
97 #[test]
98 fn test_choose_peers() {
99 let weight_array = vec![
100 WeightedItem {
101 item: Peer {
102 id: 1,
103 addr: "127.0.0.1:3001".to_string(),
104 },
105 weight: 1.0,
106 },
107 WeightedItem {
108 item: Peer {
109 id: 2,
110 addr: "127.0.0.1:3001".to_string(),
111 },
112 weight: 1.0,
113 },
114 WeightedItem {
115 item: Peer {
116 id: 3,
117 addr: "127.0.0.1:3001".to_string(),
118 },
119 weight: 1.0,
120 },
121 WeightedItem {
122 item: Peer {
123 id: 4,
124 addr: "127.0.0.1:3001".to_string(),
125 },
126 weight: 1.0,
127 },
128 WeightedItem {
129 item: Peer {
130 id: 5,
131 addr: "127.0.0.1:3001".to_string(),
132 },
133 weight: 1.0,
134 },
135 ];
136
137 for i in 1..=5 {
138 let opts = SelectorOptions {
139 min_required_items: i,
140 allow_duplication: false,
141 exclude_peer_ids: HashSet::new(),
142 workload_filter: None,
143 };
144
145 let selected_peers: HashSet<_> =
146 choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone()))
147 .unwrap()
148 .into_iter()
149 .collect();
150
151 assert_eq!(i, selected_peers.len());
152 }
153
154 let opts = SelectorOptions {
155 min_required_items: 6,
156 allow_duplication: false,
157 exclude_peer_ids: HashSet::new(),
158 workload_filter: None,
159 };
160
161 let selected_result =
162 choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone()));
163 assert!(selected_result.is_err());
164
165 for i in 1..=50 {
166 let opts = SelectorOptions {
167 min_required_items: i,
168 allow_duplication: true,
169 exclude_peer_ids: HashSet::new(),
170 workload_filter: None,
171 };
172
173 let selected_peers =
174 choose_items(&opts, &mut RandomWeightedChoose::new(weight_array.clone())).unwrap();
175
176 assert_eq!(i, selected_peers.len());
177 }
178 }
179
180 #[test]
181 fn test_filter_out_excluded_peers() {
182 let mut weight_array = vec![
183 WeightedItem {
184 item: Peer {
185 id: 1,
186 addr: "127.0.0.1:3001".to_string(),
187 },
188 weight: 1.0,
189 },
190 WeightedItem {
191 item: Peer {
192 id: 2,
193 addr: "127.0.0.1:3002".to_string(),
194 },
195 weight: 1.0,
196 },
197 ];
198
199 let exclude_peer_ids = HashSet::from([1]);
200 filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids);
201
202 assert_eq!(weight_array.len(), 1);
203 assert_eq!(weight_array[0].item.id, 2);
204 }
205}