meta_srv/selector/
weight_compute.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
18use common_meta::peer::Peer;
19use itertools::{Itertools, MinMaxResult};
20
21use crate::selector::weighted_choose::WeightedItem;
22
23/// The [`WeightCompute`] trait is used to compute the weight array by heartbeats.
24pub trait WeightCompute: Send + Sync {
25    type Source;
26
27    fn compute(&self, stat_kvs: &Self::Source) -> Vec<WeightedItem<Peer>>;
28}
29
30/// The ['RegionNumsBasedWeightCompute'] calculates weighted list based on region number obtained from the heartbeat.
31///
32/// # How to calculate the weighted list?
33/// weight = max_region_num - current_region_num + (max_region_num - min_region_num);
34///
35/// # How to calculate the reverse weighted list?
36/// reverse_weight = region_num - min_region_num + (max_region_num - min_region_num);
37pub struct RegionNumsBasedWeightCompute;
38
39impl WeightCompute for RegionNumsBasedWeightCompute {
40    type Source = HashMap<DatanodeStatKey, DatanodeStatValue>;
41
42    fn compute(
43        &self,
44        stat_kvs: &HashMap<DatanodeStatKey, DatanodeStatValue>,
45    ) -> Vec<WeightedItem<Peer>> {
46        let mut region_nums = Vec::with_capacity(stat_kvs.len());
47        let mut peers = Vec::with_capacity(stat_kvs.len());
48
49        for (stat_k, stat_v) in stat_kvs {
50            let Some(region_num) = stat_v.region_num() else {
51                continue;
52            };
53            let Some(node_addr) = stat_v.node_addr() else {
54                continue;
55            };
56
57            let peer = Peer {
58                id: stat_k.node_id,
59                addr: node_addr,
60            };
61
62            region_nums.push(region_num);
63            peers.push(peer);
64        }
65
66        if region_nums.is_empty() {
67            return vec![];
68        }
69
70        let (min_weight, max_weight) = match region_nums.iter().minmax() {
71            // unreachable safety: region_nums is not empty
72            MinMaxResult::NoElements => unreachable!(),
73            MinMaxResult::OneElement(minmax) => (*minmax, *minmax),
74            MinMaxResult::MinMax(min, max) => (*min, *max),
75        };
76
77        let base_weight = match max_weight - min_weight {
78            0 => 1,
79            x => x,
80        };
81
82        peers
83            .into_iter()
84            .zip(region_nums)
85            .map(|(peer, region_num)| WeightedItem {
86                item: peer,
87                weight: (max_weight - region_num + base_weight) as f64,
88            })
89            .collect()
90    }
91}
92
93#[cfg(test)]
94mod tests {
95    use std::collections::HashMap;
96
97    use common_meta::datanode::{
98        DatanodeStatKey, DatanodeStatValue, RegionManifestInfo, RegionStat, Stat,
99    };
100    use common_meta::peer::Peer;
101    use store_api::region_engine::RegionRole;
102    use store_api::storage::RegionId;
103
104    use super::{RegionNumsBasedWeightCompute, WeightCompute};
105
106    #[test]
107    fn test_weight_compute() {
108        let mut stat_kvs: HashMap<DatanodeStatKey, DatanodeStatValue> = HashMap::default();
109        let stat_key = DatanodeStatKey { node_id: 1 };
110        let stat_val = DatanodeStatValue {
111            stats: vec![mock_stat_1()],
112        };
113        stat_kvs.insert(stat_key, stat_val);
114        let stat_key = DatanodeStatKey { node_id: 2 };
115        let stat_val = DatanodeStatValue {
116            stats: vec![mock_stat_2()],
117        };
118        stat_kvs.insert(stat_key, stat_val);
119        let stat_key = DatanodeStatKey { node_id: 3 };
120        let stat_val = DatanodeStatValue {
121            stats: vec![mock_stat_3()],
122        };
123        stat_kvs.insert(stat_key, stat_val);
124
125        let compute = RegionNumsBasedWeightCompute;
126        let weight_array = compute.compute(&stat_kvs);
127
128        let mut expected = HashMap::new();
129        expected.insert(
130            Peer {
131                id: 1,
132                addr: "127.0.0.1:3001".to_string(),
133            },
134            4,
135        );
136        expected.insert(
137            Peer {
138                id: 2,
139                addr: "127.0.0.1:3002".to_string(),
140            },
141            3,
142        );
143        expected.insert(
144            Peer {
145                id: 3,
146                addr: "127.0.0.1:3003".to_string(),
147            },
148            2,
149        );
150        for weight in weight_array.iter() {
151            assert_eq!(*expected.get(&weight.item).unwrap(), weight.weight as usize);
152        }
153
154        let mut expected = HashMap::new();
155        expected.insert(
156            Peer {
157                id: 1,
158                addr: "127.0.0.1:3001".to_string(),
159            },
160            2,
161        );
162        expected.insert(
163            Peer {
164                id: 2,
165                addr: "127.0.0.1:3002".to_string(),
166            },
167            3,
168        );
169        expected.insert(
170            Peer {
171                id: 3,
172                addr: "127.0.0.1:3003".to_string(),
173            },
174            4,
175        );
176    }
177
178    fn mock_stat_1() -> Stat {
179        Stat {
180            addr: "127.0.0.1:3001".to_string(),
181            region_num: 11,
182            region_stats: vec![RegionStat {
183                id: RegionId::from_u64(111),
184                rcus: 1,
185                wcus: 1,
186                approximate_bytes: 1,
187                engine: "mito2".to_string(),
188                role: RegionRole::Leader,
189                num_rows: 0,
190                memtable_size: 0,
191                manifest_size: 0,
192                sst_size: 0,
193                index_size: 0,
194                region_manifest: RegionManifestInfo::Mito {
195                    manifest_version: 0,
196                    flushed_entry_id: 0,
197                },
198                data_topic_latest_entry_id: 0,
199                metadata_topic_latest_entry_id: 0,
200            }],
201            ..Default::default()
202        }
203    }
204
205    fn mock_stat_2() -> Stat {
206        Stat {
207            addr: "127.0.0.1:3002".to_string(),
208            region_num: 12,
209            region_stats: vec![RegionStat {
210                id: RegionId::from_u64(112),
211                rcus: 1,
212                wcus: 1,
213                approximate_bytes: 1,
214                engine: "mito2".to_string(),
215                role: RegionRole::Leader,
216                num_rows: 0,
217                memtable_size: 0,
218                manifest_size: 0,
219                sst_size: 0,
220                index_size: 0,
221                region_manifest: RegionManifestInfo::Mito {
222                    manifest_version: 0,
223                    flushed_entry_id: 0,
224                },
225                data_topic_latest_entry_id: 0,
226                metadata_topic_latest_entry_id: 0,
227            }],
228            ..Default::default()
229        }
230    }
231
232    fn mock_stat_3() -> Stat {
233        Stat {
234            addr: "127.0.0.1:3003".to_string(),
235            region_num: 13,
236            region_stats: vec![RegionStat {
237                id: RegionId::from_u64(113),
238                rcus: 1,
239                wcus: 1,
240                approximate_bytes: 1,
241                engine: "mito2".to_string(),
242                role: RegionRole::Leader,
243                num_rows: 0,
244                memtable_size: 0,
245                manifest_size: 0,
246                sst_size: 0,
247                index_size: 0,
248                region_manifest: RegionManifestInfo::Mito {
249                    manifest_version: 0,
250                    flushed_entry_id: 0,
251                },
252                data_topic_latest_entry_id: 0,
253                metadata_topic_latest_entry_id: 0,
254            }],
255            ..Default::default()
256        }
257    }
258}