meta_srv/selector/
load_based.rs1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
19use common_meta::key::TableMetadataManager;
20use common_meta::peer::Peer;
21use common_meta::rpc::router::find_leaders;
22use common_telemetry::{debug, info};
23use snafu::ResultExt;
24use table::metadata::TableId;
25
26use crate::error::{self, Result};
27use crate::key::{DatanodeLeaseKey, LeaseValue};
28use crate::lease;
29use crate::metasrv::SelectorContext;
30use crate::node_excluder::NodeExcluderRef;
31use crate::selector::common::{choose_items, filter_out_excluded_peers};
32use crate::selector::weight_compute::{RegionNumsBasedWeightCompute, WeightCompute};
33use crate::selector::weighted_choose::RandomWeightedChoose;
34use crate::selector::{Selector, SelectorOptions};
35
36pub struct LoadBasedSelector<C> {
37 weight_compute: C,
38 node_excluder: NodeExcluderRef,
39}
40
41impl<C> LoadBasedSelector<C> {
42 pub fn new(weight_compute: C, node_excluder: NodeExcluderRef) -> Self {
43 Self {
44 weight_compute,
45 node_excluder,
46 }
47 }
48}
49
50impl Default for LoadBasedSelector<RegionNumsBasedWeightCompute> {
51 fn default() -> Self {
52 Self {
53 weight_compute: RegionNumsBasedWeightCompute,
54 node_excluder: Arc::new(Vec::new()),
55 }
56 }
57}
58
59#[async_trait::async_trait]
60impl<C> Selector for LoadBasedSelector<C>
61where
62 C: WeightCompute<Source = HashMap<DatanodeStatKey, DatanodeStatValue>>,
63{
64 type Context = SelectorContext;
65 type Output = Vec<Peer>;
66
67 async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Self::Output> {
68 let lease_kvs = lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs)
70 .with_condition(lease::is_datanode_accept_ingest_workload)
71 .await?;
72
73 let stat_keys = lease_kvs.keys().map(|k| k.into()).collect();
75 let stat_kvs = filter_out_expired_datanode(
76 ctx.meta_peer_client.get_dn_stat_kvs(stat_keys).await?,
77 &lease_kvs,
78 );
79
80 let stat_kvs = if let Some(table_id) = ctx.table_id {
82 let table_metadata_manager = TableMetadataManager::new(ctx.kv_backend.clone());
83 let leader_peer_ids = get_leader_peer_ids(&table_metadata_manager, table_id).await?;
84 let filter_result = filter_out_datanode_by_table(&stat_kvs, &leader_peer_ids);
85 if filter_result.is_empty() {
86 info!("The regions of the table cannot be allocated to completely different datanodes, table id: {}.", table_id);
87 stat_kvs
88 } else {
89 filter_result
90 }
91 } else {
92 stat_kvs
93 };
94
95 let mut weight_array = self.weight_compute.compute(&stat_kvs);
97
98 let mut exclude_peer_ids = self
100 .node_excluder
101 .excluded_datanode_ids()
102 .iter()
103 .cloned()
104 .collect::<HashSet<_>>();
105 exclude_peer_ids.extend(opts.exclude_peer_ids.iter());
106 filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids);
107 let mut weighted_choose = RandomWeightedChoose::new(weight_array);
108 let selected = choose_items(&opts, &mut weighted_choose)?;
109
110 debug!(
111 "LoadBasedSelector select peers: {:?}, opts: {:?}.",
112 selected, opts,
113 );
114
115 Ok(selected)
116 }
117}
118
119fn filter_out_expired_datanode(
120 mut stat_kvs: HashMap<DatanodeStatKey, DatanodeStatValue>,
121 lease_kvs: &HashMap<DatanodeLeaseKey, LeaseValue>,
122) -> HashMap<DatanodeStatKey, DatanodeStatValue> {
123 lease_kvs
124 .iter()
125 .filter_map(|(lease_k, _)| stat_kvs.remove_entry(&lease_k.into()))
126 .collect()
127}
128
129fn filter_out_datanode_by_table(
130 stat_kvs: &HashMap<DatanodeStatKey, DatanodeStatValue>,
131 leader_peer_ids: &[u64],
132) -> HashMap<DatanodeStatKey, DatanodeStatValue> {
133 stat_kvs
134 .iter()
135 .filter(|(stat_k, _)| leader_peer_ids.contains(&stat_k.node_id))
136 .map(|(stat_k, stat_v)| (*stat_k, stat_v.clone()))
137 .collect()
138}
139
140async fn get_leader_peer_ids(
141 table_metadata_manager: &TableMetadataManager,
142 table_id: TableId,
143) -> Result<Vec<u64>> {
144 table_metadata_manager
145 .table_route_manager()
146 .table_route_storage()
147 .get(table_id)
148 .await
149 .context(error::TableMetadataManagerSnafu)
150 .map(|route| {
151 route.map_or_else(
152 || Ok(Vec::new()),
153 |route| {
154 let region_routes = route
155 .region_routes()
156 .context(error::UnexpectedLogicalRouteTableSnafu { err_msg: "" })?;
157 Ok(find_leaders(region_routes)
158 .into_iter()
159 .map(|peer| peer.id)
160 .collect())
161 },
162 )
163 })?
164}
165
166#[cfg(test)]
167mod tests {
168 use std::collections::HashMap;
169
170 use api::v1::meta::heartbeat_request::NodeWorkloads;
171 use api::v1::meta::DatanodeWorkloads;
172 use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
173 use common_workload::DatanodeWorkloadType;
174
175 use crate::key::{DatanodeLeaseKey, LeaseValue};
176 use crate::selector::load_based::filter_out_expired_datanode;
177
178 #[test]
179 fn test_filter_out_expired_datanode() {
180 let mut stat_kvs = HashMap::new();
181 stat_kvs.insert(
182 DatanodeStatKey { node_id: 0 },
183 DatanodeStatValue { stats: vec![] },
184 );
185 stat_kvs.insert(
186 DatanodeStatKey { node_id: 1 },
187 DatanodeStatValue { stats: vec![] },
188 );
189 stat_kvs.insert(
190 DatanodeStatKey { node_id: 2 },
191 DatanodeStatValue { stats: vec![] },
192 );
193
194 let mut lease_kvs = HashMap::new();
195 lease_kvs.insert(
196 DatanodeLeaseKey { node_id: 1 },
197 LeaseValue {
198 timestamp_millis: 0,
199 node_addr: "127.0.0.1:3002".to_string(),
200 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
201 types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
202 }),
203 },
204 );
205
206 let alive_stat_kvs = filter_out_expired_datanode(stat_kvs, &lease_kvs);
207
208 assert_eq!(1, alive_stat_kvs.len());
209 assert!(alive_stat_kvs.contains_key(&DatanodeStatKey { node_id: 1 }));
210 }
211}