meta_srv/selector/
load_based.rs1use std::collections::HashMap;
16
17use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
18use common_meta::peer::Peer;
19use common_telemetry::debug;
20use snafu::ResultExt;
21
22use crate::cluster::MetaPeerClientRef;
23use crate::error::{ListActiveDatanodesSnafu, Result};
24use crate::metasrv::SelectorContext;
25use crate::selector::common::{choose_items, filter_out_excluded_peers};
26use crate::selector::weight_compute::WeightCompute;
27use crate::selector::weighted_choose::RandomWeightedChoose;
28use crate::selector::{Selector, SelectorOptions};
29
30pub struct LoadBasedSelector<C> {
31 weight_compute: C,
32 meta_peer_client: MetaPeerClientRef,
33}
34
35impl<C> LoadBasedSelector<C> {
36 pub fn new(weight_compute: C, meta_peer_client: MetaPeerClientRef) -> Self {
37 Self {
38 weight_compute,
39 meta_peer_client,
40 }
41 }
42}
43
44#[async_trait::async_trait]
45impl<C> Selector for LoadBasedSelector<C>
46where
47 C: WeightCompute<Source = HashMap<DatanodeStatKey, DatanodeStatValue>>,
48{
49 type Context = SelectorContext;
50 type Output = Vec<Peer>;
51
52 async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Self::Output> {
53 let alive_datanodes = ctx
55 .peer_discovery
56 .active_datanodes(opts.workload_filter)
57 .await
58 .context(ListActiveDatanodesSnafu)?;
59
60 let stat_keys = alive_datanodes
62 .iter()
63 .map(|k| DatanodeStatKey { node_id: k.id })
64 .collect();
65 let stat_kvs = filter_out_expired_datanode(
66 self.meta_peer_client.get_dn_stat_kvs(stat_keys).await?,
67 &alive_datanodes,
68 );
69
70 let mut weight_array = self.weight_compute.compute(&stat_kvs);
72
73 filter_out_excluded_peers(&mut weight_array, &opts.exclude_peer_ids);
75 let mut weighted_choose = RandomWeightedChoose::new(weight_array);
77 let selected = choose_items(&opts, &mut weighted_choose)?;
78
79 debug!(
80 "LoadBasedSelector select peers: {:?}, opts: {:?}.",
81 selected, opts,
82 );
83
84 Ok(selected)
85 }
86}
87
88fn filter_out_expired_datanode(
89 mut stat_kvs: HashMap<DatanodeStatKey, DatanodeStatValue>,
90 datanodes: &[Peer],
91) -> HashMap<DatanodeStatKey, DatanodeStatValue> {
92 datanodes
93 .iter()
94 .filter_map(|p| stat_kvs.remove_entry(&DatanodeStatKey { node_id: p.id }))
95 .collect()
96}
97
98#[cfg(test)]
99mod tests {
100 use std::collections::HashMap;
101
102 use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
103 use common_meta::peer::Peer;
104
105 use crate::selector::load_based::filter_out_expired_datanode;
106
107 #[test]
108 fn test_filter_out_expired_datanode() {
109 let mut stat_kvs = HashMap::new();
110 stat_kvs.insert(
111 DatanodeStatKey { node_id: 0 },
112 DatanodeStatValue { stats: vec![] },
113 );
114 stat_kvs.insert(
115 DatanodeStatKey { node_id: 1 },
116 DatanodeStatValue { stats: vec![] },
117 );
118 stat_kvs.insert(
119 DatanodeStatKey { node_id: 2 },
120 DatanodeStatValue { stats: vec![] },
121 );
122
123 let lease_kvs = vec![Peer::new(1, "127.0.0.1:3002".to_string())];
124 let alive_stat_kvs = filter_out_expired_datanode(stat_kvs, &lease_kvs);
125
126 assert_eq!(1, alive_stat_kvs.len());
127 assert!(alive_stat_kvs.contains_key(&DatanodeStatKey { node_id: 1 }));
128 }
129}