meta_srv/selector/
load_based.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
18use common_meta::peer::Peer;
19use common_telemetry::debug;
20use snafu::ResultExt;
21
22use crate::cluster::MetaPeerClientRef;
23use crate::error::{ListActiveDatanodesSnafu, Result};
24use crate::metasrv::SelectorContext;
25use crate::selector::common::{choose_items, filter_out_excluded_peers};
26use crate::selector::weight_compute::WeightCompute;
27use crate::selector::weighted_choose::RandomWeightedChoose;
28use crate::selector::{Selector, SelectorOptions};
29
30pub struct LoadBasedSelector<C> {
31    weight_compute: C,
32    meta_peer_client: MetaPeerClientRef,
33}
34
35impl<C> LoadBasedSelector<C> {
36    pub fn new(weight_compute: C, meta_peer_client: MetaPeerClientRef) -> Self {
37        Self {
38            weight_compute,
39            meta_peer_client,
40        }
41    }
42}
43
44#[async_trait::async_trait]
45impl<C> Selector for LoadBasedSelector<C>
46where
47    C: WeightCompute<Source = HashMap<DatanodeStatKey, DatanodeStatValue>>,
48{
49    type Context = SelectorContext;
50    type Output = Vec<Peer>;
51
52    async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Self::Output> {
53        // 1. get alive datanodes.
54        let alive_datanodes = ctx
55            .peer_discovery
56            .active_datanodes(opts.workload_filter)
57            .await
58            .context(ListActiveDatanodesSnafu)?;
59
60        // 2. get stat kvs and filter out expired datanodes.
61        let stat_keys = alive_datanodes
62            .iter()
63            .map(|k| DatanodeStatKey { node_id: k.id })
64            .collect();
65        let stat_kvs = filter_out_expired_datanode(
66            self.meta_peer_client.get_dn_stat_kvs(stat_keys).await?,
67            &alive_datanodes,
68        );
69
70        // 3. compute weight array.
71        let mut weight_array = self.weight_compute.compute(&stat_kvs);
72
73        // 4. filter out excluded peers.
74        filter_out_excluded_peers(&mut weight_array, &opts.exclude_peer_ids);
75        // 5. choose peers by weight_array.
76        let mut weighted_choose = RandomWeightedChoose::new(weight_array);
77        let selected = choose_items(&opts, &mut weighted_choose)?;
78
79        debug!(
80            "LoadBasedSelector select peers: {:?}, opts: {:?}.",
81            selected, opts,
82        );
83
84        Ok(selected)
85    }
86}
87
88fn filter_out_expired_datanode(
89    mut stat_kvs: HashMap<DatanodeStatKey, DatanodeStatValue>,
90    datanodes: &[Peer],
91) -> HashMap<DatanodeStatKey, DatanodeStatValue> {
92    datanodes
93        .iter()
94        .filter_map(|p| stat_kvs.remove_entry(&DatanodeStatKey { node_id: p.id }))
95        .collect()
96}
97
98#[cfg(test)]
99mod tests {
100    use std::collections::HashMap;
101
102    use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
103    use common_meta::peer::Peer;
104
105    use crate::selector::load_based::filter_out_expired_datanode;
106
107    #[test]
108    fn test_filter_out_expired_datanode() {
109        let mut stat_kvs = HashMap::new();
110        stat_kvs.insert(
111            DatanodeStatKey { node_id: 0 },
112            DatanodeStatValue { stats: vec![] },
113        );
114        stat_kvs.insert(
115            DatanodeStatKey { node_id: 1 },
116            DatanodeStatValue { stats: vec![] },
117        );
118        stat_kvs.insert(
119            DatanodeStatKey { node_id: 2 },
120            DatanodeStatValue { stats: vec![] },
121        );
122
123        let lease_kvs = vec![Peer::new(1, "127.0.0.1:3002".to_string())];
124        let alive_stat_kvs = filter_out_expired_datanode(stat_kvs, &lease_kvs);
125
126        assert_eq!(1, alive_stat_kvs.len());
127        assert!(alive_stat_kvs.contains_key(&DatanodeStatKey { node_id: 1 }));
128    }
129}