meta_srv/selector/
load_based.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
19use common_meta::key::TableMetadataManager;
20use common_meta::peer::Peer;
21use common_meta::rpc::router::find_leaders;
22use common_telemetry::{debug, info};
23use snafu::ResultExt;
24use table::metadata::TableId;
25
26use crate::error::{self, Result};
27use crate::key::{DatanodeLeaseKey, LeaseValue};
28use crate::lease;
29use crate::metasrv::SelectorContext;
30use crate::node_excluder::NodeExcluderRef;
31use crate::selector::common::{choose_items, filter_out_excluded_peers};
32use crate::selector::weight_compute::{RegionNumsBasedWeightCompute, WeightCompute};
33use crate::selector::weighted_choose::RandomWeightedChoose;
34use crate::selector::{Selector, SelectorOptions};
35
36pub struct LoadBasedSelector<C> {
37    weight_compute: C,
38    node_excluder: NodeExcluderRef,
39}
40
41impl<C> LoadBasedSelector<C> {
42    pub fn new(weight_compute: C, node_excluder: NodeExcluderRef) -> Self {
43        Self {
44            weight_compute,
45            node_excluder,
46        }
47    }
48}
49
50impl Default for LoadBasedSelector<RegionNumsBasedWeightCompute> {
51    fn default() -> Self {
52        Self {
53            weight_compute: RegionNumsBasedWeightCompute,
54            node_excluder: Arc::new(Vec::new()),
55        }
56    }
57}
58
59#[async_trait::async_trait]
60impl<C> Selector for LoadBasedSelector<C>
61where
62    C: WeightCompute<Source = HashMap<DatanodeStatKey, DatanodeStatValue>>,
63{
64    type Context = SelectorContext;
65    type Output = Vec<Peer>;
66
67    async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Self::Output> {
68        // 1. get alive datanodes.
69        let lease_kvs = lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs)
70            .with_condition(lease::is_datanode_accept_ingest_workload)
71            .await?;
72
73        // 2. get stat kvs and filter out expired datanodes.
74        let stat_keys = lease_kvs.keys().map(|k| k.into()).collect();
75        let stat_kvs = filter_out_expired_datanode(
76            ctx.meta_peer_client.get_dn_stat_kvs(stat_keys).await?,
77            &lease_kvs,
78        );
79
80        // 3. try to make the regions of a table distributed on different datanodes as much as possible.
81        let stat_kvs = if let Some(table_id) = ctx.table_id {
82            let table_metadata_manager = TableMetadataManager::new(ctx.kv_backend.clone());
83            let leader_peer_ids = get_leader_peer_ids(&table_metadata_manager, table_id).await?;
84            let filter_result = filter_out_datanode_by_table(&stat_kvs, &leader_peer_ids);
85            if filter_result.is_empty() {
86                info!("The regions of the table cannot be allocated to completely different datanodes, table id: {}.", table_id);
87                stat_kvs
88            } else {
89                filter_result
90            }
91        } else {
92            stat_kvs
93        };
94
95        // 4. compute weight array.
96        let mut weight_array = self.weight_compute.compute(&stat_kvs);
97
98        // 5. choose peers by weight_array.
99        let mut exclude_peer_ids = self
100            .node_excluder
101            .excluded_datanode_ids()
102            .iter()
103            .cloned()
104            .collect::<HashSet<_>>();
105        exclude_peer_ids.extend(opts.exclude_peer_ids.iter());
106        filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids);
107        let mut weighted_choose = RandomWeightedChoose::new(weight_array);
108        let selected = choose_items(&opts, &mut weighted_choose)?;
109
110        debug!(
111            "LoadBasedSelector select peers: {:?}, opts: {:?}.",
112            selected, opts,
113        );
114
115        Ok(selected)
116    }
117}
118
119fn filter_out_expired_datanode(
120    mut stat_kvs: HashMap<DatanodeStatKey, DatanodeStatValue>,
121    lease_kvs: &HashMap<DatanodeLeaseKey, LeaseValue>,
122) -> HashMap<DatanodeStatKey, DatanodeStatValue> {
123    lease_kvs
124        .iter()
125        .filter_map(|(lease_k, _)| stat_kvs.remove_entry(&lease_k.into()))
126        .collect()
127}
128
129fn filter_out_datanode_by_table(
130    stat_kvs: &HashMap<DatanodeStatKey, DatanodeStatValue>,
131    leader_peer_ids: &[u64],
132) -> HashMap<DatanodeStatKey, DatanodeStatValue> {
133    stat_kvs
134        .iter()
135        .filter(|(stat_k, _)| leader_peer_ids.contains(&stat_k.node_id))
136        .map(|(stat_k, stat_v)| (*stat_k, stat_v.clone()))
137        .collect()
138}
139
140async fn get_leader_peer_ids(
141    table_metadata_manager: &TableMetadataManager,
142    table_id: TableId,
143) -> Result<Vec<u64>> {
144    table_metadata_manager
145        .table_route_manager()
146        .table_route_storage()
147        .get(table_id)
148        .await
149        .context(error::TableMetadataManagerSnafu)
150        .map(|route| {
151            route.map_or_else(
152                || Ok(Vec::new()),
153                |route| {
154                    let region_routes = route
155                        .region_routes()
156                        .context(error::UnexpectedLogicalRouteTableSnafu { err_msg: "" })?;
157                    Ok(find_leaders(region_routes)
158                        .into_iter()
159                        .map(|peer| peer.id)
160                        .collect())
161                },
162            )
163        })?
164}
165
166#[cfg(test)]
167mod tests {
168    use std::collections::HashMap;
169
170    use api::v1::meta::heartbeat_request::NodeWorkloads;
171    use api::v1::meta::DatanodeWorkloads;
172    use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
173    use common_workload::DatanodeWorkloadType;
174
175    use crate::key::{DatanodeLeaseKey, LeaseValue};
176    use crate::selector::load_based::filter_out_expired_datanode;
177
178    #[test]
179    fn test_filter_out_expired_datanode() {
180        let mut stat_kvs = HashMap::new();
181        stat_kvs.insert(
182            DatanodeStatKey { node_id: 0 },
183            DatanodeStatValue { stats: vec![] },
184        );
185        stat_kvs.insert(
186            DatanodeStatKey { node_id: 1 },
187            DatanodeStatValue { stats: vec![] },
188        );
189        stat_kvs.insert(
190            DatanodeStatKey { node_id: 2 },
191            DatanodeStatValue { stats: vec![] },
192        );
193
194        let mut lease_kvs = HashMap::new();
195        lease_kvs.insert(
196            DatanodeLeaseKey { node_id: 1 },
197            LeaseValue {
198                timestamp_millis: 0,
199                node_addr: "127.0.0.1:3002".to_string(),
200                workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
201                    types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
202                }),
203            },
204        );
205
206        let alive_stat_kvs = filter_out_expired_datanode(stat_kvs, &lease_kvs);
207
208        assert_eq!(1, alive_stat_kvs.len());
209        assert!(alive_stat_kvs.contains_key(&DatanodeStatKey { node_id: 1 }));
210    }
211}