meta_srv/selector/
load_based.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
20use common_meta::key::TableMetadataManager;
21use common_meta::peer::Peer;
22use common_meta::rpc::router::find_leaders;
23use common_telemetry::{debug, info};
24use snafu::ResultExt;
25use table::metadata::TableId;
26
27use crate::error::{self, Result};
28use crate::key::{DatanodeLeaseKey, LeaseValue};
29use crate::lease;
30use crate::metasrv::SelectorContext;
31use crate::node_excluder::NodeExcluderRef;
32use crate::selector::common::{choose_items, filter_out_excluded_peers};
33use crate::selector::weight_compute::{RegionNumsBasedWeightCompute, WeightCompute};
34use crate::selector::weighted_choose::RandomWeightedChoose;
35use crate::selector::{Selector, SelectorOptions};
36
37pub struct LoadBasedSelector<C> {
38    weight_compute: C,
39    node_excluder: NodeExcluderRef,
40}
41
42impl<C> LoadBasedSelector<C> {
43    pub fn new(weight_compute: C, node_excluder: NodeExcluderRef) -> Self {
44        Self {
45            weight_compute,
46            node_excluder,
47        }
48    }
49}
50
51impl Default for LoadBasedSelector<RegionNumsBasedWeightCompute> {
52    fn default() -> Self {
53        Self {
54            weight_compute: RegionNumsBasedWeightCompute,
55            node_excluder: Arc::new(Vec::new()),
56        }
57    }
58}
59
60#[async_trait::async_trait]
61impl<C> Selector for LoadBasedSelector<C>
62where
63    C: WeightCompute<Source = HashMap<DatanodeStatKey, DatanodeStatValue>>,
64{
65    type Context = SelectorContext;
66    type Output = Vec<Peer>;
67
68    async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Self::Output> {
69        // 1. get alive datanodes.
70        let lease_kvs = lease::alive_datanodes(
71            &ctx.meta_peer_client,
72            Duration::from_secs(ctx.datanode_lease_secs),
73        )
74        .with_condition(lease::is_datanode_accept_ingest_workload)
75        .await?;
76
77        // 2. get stat kvs and filter out expired datanodes.
78        let stat_keys = lease_kvs.keys().map(|k| k.into()).collect();
79        let stat_kvs = filter_out_expired_datanode(
80            ctx.meta_peer_client.get_dn_stat_kvs(stat_keys).await?,
81            &lease_kvs,
82        );
83
84        // 3. try to make the regions of a table distributed on different datanodes as much as possible.
85        let stat_kvs = if let Some(table_id) = ctx.table_id {
86            let table_metadata_manager = TableMetadataManager::new(ctx.kv_backend.clone());
87            let leader_peer_ids = get_leader_peer_ids(&table_metadata_manager, table_id).await?;
88            let filter_result = filter_out_datanode_by_table(&stat_kvs, &leader_peer_ids);
89            if filter_result.is_empty() {
90                info!("The regions of the table cannot be allocated to completely different datanodes, table id: {}.", table_id);
91                stat_kvs
92            } else {
93                filter_result
94            }
95        } else {
96            stat_kvs
97        };
98
99        // 4. compute weight array.
100        let mut weight_array = self.weight_compute.compute(&stat_kvs);
101
102        // 5. choose peers by weight_array.
103        let mut exclude_peer_ids = self
104            .node_excluder
105            .excluded_datanode_ids()
106            .iter()
107            .cloned()
108            .collect::<HashSet<_>>();
109        exclude_peer_ids.extend(opts.exclude_peer_ids.iter());
110        filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids);
111        let mut weighted_choose = RandomWeightedChoose::new(weight_array);
112        let selected = choose_items(&opts, &mut weighted_choose)?;
113
114        debug!(
115            "LoadBasedSelector select peers: {:?}, opts: {:?}.",
116            selected, opts,
117        );
118
119        Ok(selected)
120    }
121}
122
123fn filter_out_expired_datanode(
124    mut stat_kvs: HashMap<DatanodeStatKey, DatanodeStatValue>,
125    lease_kvs: &HashMap<DatanodeLeaseKey, LeaseValue>,
126) -> HashMap<DatanodeStatKey, DatanodeStatValue> {
127    lease_kvs
128        .iter()
129        .filter_map(|(lease_k, _)| stat_kvs.remove_entry(&lease_k.into()))
130        .collect()
131}
132
133fn filter_out_datanode_by_table(
134    stat_kvs: &HashMap<DatanodeStatKey, DatanodeStatValue>,
135    leader_peer_ids: &[u64],
136) -> HashMap<DatanodeStatKey, DatanodeStatValue> {
137    stat_kvs
138        .iter()
139        .filter(|(stat_k, _)| leader_peer_ids.contains(&stat_k.node_id))
140        .map(|(stat_k, stat_v)| (*stat_k, stat_v.clone()))
141        .collect()
142}
143
144async fn get_leader_peer_ids(
145    table_metadata_manager: &TableMetadataManager,
146    table_id: TableId,
147) -> Result<Vec<u64>> {
148    table_metadata_manager
149        .table_route_manager()
150        .table_route_storage()
151        .get(table_id)
152        .await
153        .context(error::TableMetadataManagerSnafu)
154        .map(|route| {
155            route.map_or_else(
156                || Ok(Vec::new()),
157                |route| {
158                    let region_routes = route
159                        .region_routes()
160                        .context(error::UnexpectedLogicalRouteTableSnafu { err_msg: "" })?;
161                    Ok(find_leaders(region_routes)
162                        .into_iter()
163                        .map(|peer| peer.id)
164                        .collect())
165                },
166            )
167        })?
168}
169
170#[cfg(test)]
171mod tests {
172    use std::collections::HashMap;
173
174    use api::v1::meta::heartbeat_request::NodeWorkloads;
175    use api::v1::meta::DatanodeWorkloads;
176    use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
177    use common_workload::DatanodeWorkloadType;
178
179    use crate::key::{DatanodeLeaseKey, LeaseValue};
180    use crate::selector::load_based::filter_out_expired_datanode;
181
182    #[test]
183    fn test_filter_out_expired_datanode() {
184        let mut stat_kvs = HashMap::new();
185        stat_kvs.insert(
186            DatanodeStatKey { node_id: 0 },
187            DatanodeStatValue { stats: vec![] },
188        );
189        stat_kvs.insert(
190            DatanodeStatKey { node_id: 1 },
191            DatanodeStatValue { stats: vec![] },
192        );
193        stat_kvs.insert(
194            DatanodeStatKey { node_id: 2 },
195            DatanodeStatValue { stats: vec![] },
196        );
197
198        let mut lease_kvs = HashMap::new();
199        lease_kvs.insert(
200            DatanodeLeaseKey { node_id: 1 },
201            LeaseValue {
202                timestamp_millis: 0,
203                node_addr: "127.0.0.1:3002".to_string(),
204                workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
205                    types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
206                }),
207            },
208        );
209
210        let alive_stat_kvs = filter_out_expired_datanode(stat_kvs, &lease_kvs);
211
212        assert_eq!(1, alive_stat_kvs.len());
213        assert!(alive_stat_kvs.contains_key(&DatanodeStatKey { node_id: 1 }));
214    }
215}