1use std::collections::HashMap;
16
17use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
18use common_meta::peer::Peer;
19use itertools::{Itertools, MinMaxResult};
20
21use crate::selector::weighted_choose::WeightedItem;
22
23pub trait WeightCompute: Send + Sync {
25 type Source;
26
27 fn compute(&self, stat_kvs: &Self::Source) -> Vec<WeightedItem<Peer>>;
28}
29
30pub struct RegionNumsBasedWeightCompute;
38
39impl WeightCompute for RegionNumsBasedWeightCompute {
40 type Source = HashMap<DatanodeStatKey, DatanodeStatValue>;
41
42 fn compute(
43 &self,
44 stat_kvs: &HashMap<DatanodeStatKey, DatanodeStatValue>,
45 ) -> Vec<WeightedItem<Peer>> {
46 let mut region_nums = Vec::with_capacity(stat_kvs.len());
47 let mut peers = Vec::with_capacity(stat_kvs.len());
48
49 for (stat_k, stat_v) in stat_kvs {
50 let Some(region_num) = stat_v.region_num() else {
51 continue;
52 };
53 let Some(node_addr) = stat_v.node_addr() else {
54 continue;
55 };
56
57 let peer = Peer {
58 id: stat_k.node_id,
59 addr: node_addr,
60 };
61
62 region_nums.push(region_num);
63 peers.push(peer);
64 }
65
66 if region_nums.is_empty() {
67 return vec![];
68 }
69
70 let (min_weight, max_weight) = match region_nums.iter().minmax() {
71 MinMaxResult::NoElements => unreachable!(),
73 MinMaxResult::OneElement(minmax) => (*minmax, *minmax),
74 MinMaxResult::MinMax(min, max) => (*min, *max),
75 };
76
77 let base_weight = match max_weight - min_weight {
78 0 => 1,
79 x => x,
80 };
81
82 peers
83 .into_iter()
84 .zip(region_nums)
85 .map(|(peer, region_num)| WeightedItem {
86 item: peer,
87 weight: (max_weight - region_num + base_weight) as f64,
88 })
89 .collect()
90 }
91}
92
93#[cfg(test)]
94mod tests {
95 use std::collections::HashMap;
96
97 use common_meta::datanode::{
98 DatanodeStatKey, DatanodeStatValue, RegionManifestInfo, RegionStat, Stat,
99 };
100 use common_meta::peer::Peer;
101 use store_api::region_engine::RegionRole;
102 use store_api::storage::RegionId;
103
104 use super::{RegionNumsBasedWeightCompute, WeightCompute};
105
106 #[test]
107 fn test_weight_compute() {
108 let mut stat_kvs: HashMap<DatanodeStatKey, DatanodeStatValue> = HashMap::default();
109 let stat_key = DatanodeStatKey { node_id: 1 };
110 let stat_val = DatanodeStatValue {
111 stats: vec![mock_stat_1()],
112 };
113 stat_kvs.insert(stat_key, stat_val);
114 let stat_key = DatanodeStatKey { node_id: 2 };
115 let stat_val = DatanodeStatValue {
116 stats: vec![mock_stat_2()],
117 };
118 stat_kvs.insert(stat_key, stat_val);
119 let stat_key = DatanodeStatKey { node_id: 3 };
120 let stat_val = DatanodeStatValue {
121 stats: vec![mock_stat_3()],
122 };
123 stat_kvs.insert(stat_key, stat_val);
124
125 let compute = RegionNumsBasedWeightCompute;
126 let weight_array = compute.compute(&stat_kvs);
127
128 let mut expected = HashMap::new();
129 expected.insert(
130 Peer {
131 id: 1,
132 addr: "127.0.0.1:3001".to_string(),
133 },
134 4,
135 );
136 expected.insert(
137 Peer {
138 id: 2,
139 addr: "127.0.0.1:3002".to_string(),
140 },
141 3,
142 );
143 expected.insert(
144 Peer {
145 id: 3,
146 addr: "127.0.0.1:3003".to_string(),
147 },
148 2,
149 );
150 for weight in weight_array.iter() {
151 assert_eq!(*expected.get(&weight.item).unwrap(), weight.weight as usize);
152 }
153
154 let mut expected = HashMap::new();
155 expected.insert(
156 Peer {
157 id: 1,
158 addr: "127.0.0.1:3001".to_string(),
159 },
160 2,
161 );
162 expected.insert(
163 Peer {
164 id: 2,
165 addr: "127.0.0.1:3002".to_string(),
166 },
167 3,
168 );
169 expected.insert(
170 Peer {
171 id: 3,
172 addr: "127.0.0.1:3003".to_string(),
173 },
174 4,
175 );
176 }
177
178 fn mock_stat_1() -> Stat {
179 Stat {
180 addr: "127.0.0.1:3001".to_string(),
181 region_num: 11,
182 region_stats: vec![RegionStat {
183 id: RegionId::from_u64(111),
184 rcus: 1,
185 wcus: 1,
186 approximate_bytes: 1,
187 engine: "mito2".to_string(),
188 role: RegionRole::Leader,
189 num_rows: 0,
190 memtable_size: 0,
191 manifest_size: 0,
192 sst_size: 0,
193 index_size: 0,
194 region_manifest: RegionManifestInfo::Mito {
195 manifest_version: 0,
196 flushed_entry_id: 0,
197 },
198 data_topic_latest_entry_id: 0,
199 metadata_topic_latest_entry_id: 0,
200 }],
201 ..Default::default()
202 }
203 }
204
205 fn mock_stat_2() -> Stat {
206 Stat {
207 addr: "127.0.0.1:3002".to_string(),
208 region_num: 12,
209 region_stats: vec![RegionStat {
210 id: RegionId::from_u64(112),
211 rcus: 1,
212 wcus: 1,
213 approximate_bytes: 1,
214 engine: "mito2".to_string(),
215 role: RegionRole::Leader,
216 num_rows: 0,
217 memtable_size: 0,
218 manifest_size: 0,
219 sst_size: 0,
220 index_size: 0,
221 region_manifest: RegionManifestInfo::Mito {
222 manifest_version: 0,
223 flushed_entry_id: 0,
224 },
225 data_topic_latest_entry_id: 0,
226 metadata_topic_latest_entry_id: 0,
227 }],
228 ..Default::default()
229 }
230 }
231
232 fn mock_stat_3() -> Stat {
233 Stat {
234 addr: "127.0.0.1:3003".to_string(),
235 region_num: 13,
236 region_stats: vec![RegionStat {
237 id: RegionId::from_u64(113),
238 rcus: 1,
239 wcus: 1,
240 approximate_bytes: 1,
241 engine: "mito2".to_string(),
242 role: RegionRole::Leader,
243 num_rows: 0,
244 memtable_size: 0,
245 manifest_size: 0,
246 sst_size: 0,
247 index_size: 0,
248 region_manifest: RegionManifestInfo::Mito {
249 manifest_version: 0,
250 flushed_entry_id: 0,
251 },
252 data_topic_latest_entry_id: 0,
253 metadata_topic_latest_entry_id: 0,
254 }],
255 ..Default::default()
256 }
257 }
258}