meta_srv/selector/
load_based.rs1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
20use common_meta::key::TableMetadataManager;
21use common_meta::peer::Peer;
22use common_meta::rpc::router::find_leaders;
23use common_telemetry::{debug, info};
24use snafu::ResultExt;
25use table::metadata::TableId;
26
27use crate::error::{self, Result};
28use crate::key::{DatanodeLeaseKey, LeaseValue};
29use crate::lease;
30use crate::metasrv::SelectorContext;
31use crate::node_excluder::NodeExcluderRef;
32use crate::selector::common::{choose_items, filter_out_excluded_peers};
33use crate::selector::weight_compute::{RegionNumsBasedWeightCompute, WeightCompute};
34use crate::selector::weighted_choose::RandomWeightedChoose;
35use crate::selector::{Selector, SelectorOptions};
36
37pub struct LoadBasedSelector<C> {
38 weight_compute: C,
39 node_excluder: NodeExcluderRef,
40}
41
42impl<C> LoadBasedSelector<C> {
43 pub fn new(weight_compute: C, node_excluder: NodeExcluderRef) -> Self {
44 Self {
45 weight_compute,
46 node_excluder,
47 }
48 }
49}
50
51impl Default for LoadBasedSelector<RegionNumsBasedWeightCompute> {
52 fn default() -> Self {
53 Self {
54 weight_compute: RegionNumsBasedWeightCompute,
55 node_excluder: Arc::new(Vec::new()),
56 }
57 }
58}
59
60#[async_trait::async_trait]
61impl<C> Selector for LoadBasedSelector<C>
62where
63 C: WeightCompute<Source = HashMap<DatanodeStatKey, DatanodeStatValue>>,
64{
65 type Context = SelectorContext;
66 type Output = Vec<Peer>;
67
68 async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Self::Output> {
69 let lease_kvs = lease::alive_datanodes(
71 &ctx.meta_peer_client,
72 Duration::from_secs(ctx.datanode_lease_secs),
73 )
74 .with_condition(lease::is_datanode_accept_ingest_workload)
75 .await?;
76
77 let stat_keys = lease_kvs.keys().map(|k| k.into()).collect();
79 let stat_kvs = filter_out_expired_datanode(
80 ctx.meta_peer_client.get_dn_stat_kvs(stat_keys).await?,
81 &lease_kvs,
82 );
83
84 let stat_kvs = if let Some(table_id) = ctx.table_id {
86 let table_metadata_manager = TableMetadataManager::new(ctx.kv_backend.clone());
87 let leader_peer_ids = get_leader_peer_ids(&table_metadata_manager, table_id).await?;
88 let filter_result = filter_out_datanode_by_table(&stat_kvs, &leader_peer_ids);
89 if filter_result.is_empty() {
90 info!("The regions of the table cannot be allocated to completely different datanodes, table id: {}.", table_id);
91 stat_kvs
92 } else {
93 filter_result
94 }
95 } else {
96 stat_kvs
97 };
98
99 let mut weight_array = self.weight_compute.compute(&stat_kvs);
101
102 let mut exclude_peer_ids = self
104 .node_excluder
105 .excluded_datanode_ids()
106 .iter()
107 .cloned()
108 .collect::<HashSet<_>>();
109 exclude_peer_ids.extend(opts.exclude_peer_ids.iter());
110 filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids);
111 let mut weighted_choose = RandomWeightedChoose::new(weight_array);
112 let selected = choose_items(&opts, &mut weighted_choose)?;
113
114 debug!(
115 "LoadBasedSelector select peers: {:?}, opts: {:?}.",
116 selected, opts,
117 );
118
119 Ok(selected)
120 }
121}
122
123fn filter_out_expired_datanode(
124 mut stat_kvs: HashMap<DatanodeStatKey, DatanodeStatValue>,
125 lease_kvs: &HashMap<DatanodeLeaseKey, LeaseValue>,
126) -> HashMap<DatanodeStatKey, DatanodeStatValue> {
127 lease_kvs
128 .iter()
129 .filter_map(|(lease_k, _)| stat_kvs.remove_entry(&lease_k.into()))
130 .collect()
131}
132
133fn filter_out_datanode_by_table(
134 stat_kvs: &HashMap<DatanodeStatKey, DatanodeStatValue>,
135 leader_peer_ids: &[u64],
136) -> HashMap<DatanodeStatKey, DatanodeStatValue> {
137 stat_kvs
138 .iter()
139 .filter(|(stat_k, _)| leader_peer_ids.contains(&stat_k.node_id))
140 .map(|(stat_k, stat_v)| (*stat_k, stat_v.clone()))
141 .collect()
142}
143
144async fn get_leader_peer_ids(
145 table_metadata_manager: &TableMetadataManager,
146 table_id: TableId,
147) -> Result<Vec<u64>> {
148 table_metadata_manager
149 .table_route_manager()
150 .table_route_storage()
151 .get(table_id)
152 .await
153 .context(error::TableMetadataManagerSnafu)
154 .map(|route| {
155 route.map_or_else(
156 || Ok(Vec::new()),
157 |route| {
158 let region_routes = route
159 .region_routes()
160 .context(error::UnexpectedLogicalRouteTableSnafu { err_msg: "" })?;
161 Ok(find_leaders(region_routes)
162 .into_iter()
163 .map(|peer| peer.id)
164 .collect())
165 },
166 )
167 })?
168}
169
170#[cfg(test)]
171mod tests {
172 use std::collections::HashMap;
173
174 use api::v1::meta::heartbeat_request::NodeWorkloads;
175 use api::v1::meta::DatanodeWorkloads;
176 use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
177 use common_workload::DatanodeWorkloadType;
178
179 use crate::key::{DatanodeLeaseKey, LeaseValue};
180 use crate::selector::load_based::filter_out_expired_datanode;
181
182 #[test]
183 fn test_filter_out_expired_datanode() {
184 let mut stat_kvs = HashMap::new();
185 stat_kvs.insert(
186 DatanodeStatKey { node_id: 0 },
187 DatanodeStatValue { stats: vec![] },
188 );
189 stat_kvs.insert(
190 DatanodeStatKey { node_id: 1 },
191 DatanodeStatValue { stats: vec![] },
192 );
193 stat_kvs.insert(
194 DatanodeStatKey { node_id: 2 },
195 DatanodeStatValue { stats: vec![] },
196 );
197
198 let mut lease_kvs = HashMap::new();
199 lease_kvs.insert(
200 DatanodeLeaseKey { node_id: 1 },
201 LeaseValue {
202 timestamp_millis: 0,
203 node_addr: "127.0.0.1:3002".to_string(),
204 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
205 types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
206 }),
207 },
208 );
209
210 let alive_stat_kvs = filter_out_expired_datanode(stat_kvs, &lease_kvs);
211
212 assert_eq!(1, alive_stat_kvs.len());
213 assert!(alive_stat_kvs.contains_key(&DatanodeStatKey { node_id: 1 }));
214 }
215}