meta_srv/selector/
lease_based.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_meta::peer::Peer;
20
21use crate::error::Result;
22use crate::lease;
23use crate::metasrv::SelectorContext;
24use crate::node_excluder::NodeExcluderRef;
25use crate::selector::common::{choose_items, filter_out_excluded_peers};
26use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
27use crate::selector::{Selector, SelectorOptions};
28
29/// Select all alive datanodes based using a random weighted choose.
30pub struct LeaseBasedSelector {
31    node_excluder: NodeExcluderRef,
32}
33
34impl LeaseBasedSelector {
35    pub fn new(node_excluder: NodeExcluderRef) -> Self {
36        Self { node_excluder }
37    }
38}
39
40impl Default for LeaseBasedSelector {
41    fn default() -> Self {
42        Self {
43            node_excluder: Arc::new(Vec::new()),
44        }
45    }
46}
47
48#[async_trait::async_trait]
49impl Selector for LeaseBasedSelector {
50    type Context = SelectorContext;
51    type Output = Vec<Peer>;
52
53    async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Self::Output> {
54        // 1. get alive datanodes.
55        let lease_kvs = lease::alive_datanodes(
56            &ctx.meta_peer_client,
57            Duration::from_secs(ctx.datanode_lease_secs),
58        )
59        .with_condition(lease::is_datanode_accept_ingest_workload)
60        .await?;
61
62        // 2. compute weight array, but the weight of each item is the same.
63        let mut weight_array = lease_kvs
64            .into_iter()
65            .map(|(k, v)| WeightedItem {
66                item: Peer {
67                    id: k.node_id,
68                    addr: v.node_addr.clone(),
69                },
70                weight: 1.0,
71            })
72            .collect();
73
74        // 3. choose peers by weight_array.
75        let mut exclude_peer_ids = self
76            .node_excluder
77            .excluded_datanode_ids()
78            .iter()
79            .cloned()
80            .collect::<HashSet<_>>();
81        exclude_peer_ids.extend(opts.exclude_peer_ids.iter());
82        filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids);
83        let mut weighted_choose = RandomWeightedChoose::new(weight_array);
84        let selected = choose_items(&opts, &mut weighted_choose)?;
85
86        Ok(selected)
87    }
88}