1use std::collections::HashMap;
16
17use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
18use common_meta::peer::Peer;
19use itertools::{Itertools, MinMaxResult};
20
21use crate::selector::weighted_choose::WeightedItem;
22
23pub trait WeightCompute: Send + Sync {
25 type Source;
26
27 fn compute(&self, stat_kvs: &Self::Source) -> Vec<WeightedItem<Peer>>;
28}
29
30pub struct RegionNumsBasedWeightCompute;
38
39impl WeightCompute for RegionNumsBasedWeightCompute {
40 type Source = HashMap<DatanodeStatKey, DatanodeStatValue>;
41
42 fn compute(
43 &self,
44 stat_kvs: &HashMap<DatanodeStatKey, DatanodeStatValue>,
45 ) -> Vec<WeightedItem<Peer>> {
46 let mut region_nums = Vec::with_capacity(stat_kvs.len());
47 let mut peers = Vec::with_capacity(stat_kvs.len());
48
49 for (stat_k, stat_v) in stat_kvs {
50 let Some(region_num) = stat_v.region_num() else {
51 continue;
52 };
53 let Some(node_addr) = stat_v.node_addr() else {
54 continue;
55 };
56
57 let peer = Peer {
58 id: stat_k.node_id,
59 addr: node_addr,
60 };
61
62 region_nums.push(region_num);
63 peers.push(peer);
64 }
65
66 if region_nums.is_empty() {
67 return vec![];
68 }
69
70 let (min_weight, max_weight) = match region_nums.iter().minmax() {
71 MinMaxResult::NoElements => unreachable!(),
73 MinMaxResult::OneElement(minmax) => (*minmax, *minmax),
74 MinMaxResult::MinMax(min, max) => (*min, *max),
75 };
76
77 let base_weight = match max_weight - min_weight {
78 0 => 1,
79 x => x,
80 };
81
82 peers
83 .into_iter()
84 .zip(region_nums)
85 .map(|(peer, region_num)| WeightedItem {
86 item: peer,
87 weight: (max_weight - region_num + base_weight) as f64,
88 })
89 .collect()
90 }
91}
92
93#[cfg(test)]
94mod tests {
95 use std::collections::HashMap;
96
97 use common_meta::datanode::{
98 DatanodeStatKey, DatanodeStatValue, RegionManifestInfo, RegionStat, Stat,
99 };
100 use common_meta::peer::Peer;
101 use store_api::region_engine::RegionRole;
102 use store_api::storage::RegionId;
103
104 use super::{RegionNumsBasedWeightCompute, WeightCompute};
105
106 #[test]
107 fn test_weight_compute() {
108 let mut stat_kvs: HashMap<DatanodeStatKey, DatanodeStatValue> = HashMap::default();
109 let stat_key = DatanodeStatKey { node_id: 1 };
110 let stat_val = DatanodeStatValue {
111 stats: vec![mock_stat_1()],
112 };
113 stat_kvs.insert(stat_key, stat_val);
114 let stat_key = DatanodeStatKey { node_id: 2 };
115 let stat_val = DatanodeStatValue {
116 stats: vec![mock_stat_2()],
117 };
118 stat_kvs.insert(stat_key, stat_val);
119 let stat_key = DatanodeStatKey { node_id: 3 };
120 let stat_val = DatanodeStatValue {
121 stats: vec![mock_stat_3()],
122 };
123 stat_kvs.insert(stat_key, stat_val);
124
125 let compute = RegionNumsBasedWeightCompute;
126 let weight_array = compute.compute(&stat_kvs);
127
128 let mut expected = HashMap::new();
129 expected.insert(
130 Peer {
131 id: 1,
132 addr: "127.0.0.1:3001".to_string(),
133 },
134 4,
135 );
136 expected.insert(
137 Peer {
138 id: 2,
139 addr: "127.0.0.1:3002".to_string(),
140 },
141 3,
142 );
143 expected.insert(
144 Peer {
145 id: 3,
146 addr: "127.0.0.1:3003".to_string(),
147 },
148 2,
149 );
150 for weight in weight_array.iter() {
151 assert_eq!(*expected.get(&weight.item).unwrap(), weight.weight as usize);
152 }
153
154 let mut expected = HashMap::new();
155 expected.insert(
156 Peer {
157 id: 1,
158 addr: "127.0.0.1:3001".to_string(),
159 },
160 2,
161 );
162 expected.insert(
163 Peer {
164 id: 2,
165 addr: "127.0.0.1:3002".to_string(),
166 },
167 3,
168 );
169 expected.insert(
170 Peer {
171 id: 3,
172 addr: "127.0.0.1:3003".to_string(),
173 },
174 4,
175 );
176 }
177
178 fn mock_stat_1() -> Stat {
179 Stat {
180 addr: "127.0.0.1:3001".to_string(),
181 region_num: 11,
182 region_stats: vec![RegionStat {
183 id: RegionId::from_u64(111),
184 rcus: 1,
185 wcus: 1,
186 approximate_bytes: 1,
187 engine: "mito2".to_string(),
188 role: RegionRole::Leader,
189 num_rows: 0,
190 memtable_size: 0,
191 manifest_size: 0,
192 sst_size: 0,
193 sst_num: 0,
194 index_size: 0,
195 region_manifest: RegionManifestInfo::Mito {
196 manifest_version: 0,
197 flushed_entry_id: 0,
198 },
199 data_topic_latest_entry_id: 0,
200 metadata_topic_latest_entry_id: 0,
201 written_bytes: 0,
202 }],
203 ..Default::default()
204 }
205 }
206
207 fn mock_stat_2() -> Stat {
208 Stat {
209 addr: "127.0.0.1:3002".to_string(),
210 region_num: 12,
211 region_stats: vec![RegionStat {
212 id: RegionId::from_u64(112),
213 rcus: 1,
214 wcus: 1,
215 approximate_bytes: 1,
216 engine: "mito2".to_string(),
217 role: RegionRole::Leader,
218 num_rows: 0,
219 memtable_size: 0,
220 manifest_size: 0,
221 sst_size: 0,
222 sst_num: 0,
223 index_size: 0,
224 region_manifest: RegionManifestInfo::Mito {
225 manifest_version: 0,
226 flushed_entry_id: 0,
227 },
228 data_topic_latest_entry_id: 0,
229 metadata_topic_latest_entry_id: 0,
230 written_bytes: 0,
231 }],
232 ..Default::default()
233 }
234 }
235
236 fn mock_stat_3() -> Stat {
237 Stat {
238 addr: "127.0.0.1:3003".to_string(),
239 region_num: 13,
240 region_stats: vec![RegionStat {
241 id: RegionId::from_u64(113),
242 rcus: 1,
243 wcus: 1,
244 approximate_bytes: 1,
245 engine: "mito2".to_string(),
246 role: RegionRole::Leader,
247 num_rows: 0,
248 memtable_size: 0,
249 manifest_size: 0,
250 sst_size: 0,
251 sst_num: 0,
252 index_size: 0,
253 region_manifest: RegionManifestInfo::Mito {
254 manifest_version: 0,
255 flushed_entry_id: 0,
256 },
257 data_topic_latest_entry_id: 0,
258 metadata_topic_latest_entry_id: 0,
259 written_bytes: 0,
260 }],
261 ..Default::default()
262 }
263 }
264}