meta_srv/selector/
lease_based.rs1use std::collections::HashSet;
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_meta::peer::Peer;
20
21use crate::error::Result;
22use crate::lease;
23use crate::metasrv::SelectorContext;
24use crate::node_excluder::NodeExcluderRef;
25use crate::selector::common::{choose_items, filter_out_excluded_peers};
26use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
27use crate::selector::{Selector, SelectorOptions};
28
29pub struct LeaseBasedSelector {
31 node_excluder: NodeExcluderRef,
32}
33
34impl LeaseBasedSelector {
35 pub fn new(node_excluder: NodeExcluderRef) -> Self {
36 Self { node_excluder }
37 }
38}
39
40impl Default for LeaseBasedSelector {
41 fn default() -> Self {
42 Self {
43 node_excluder: Arc::new(Vec::new()),
44 }
45 }
46}
47
48#[async_trait::async_trait]
49impl Selector for LeaseBasedSelector {
50 type Context = SelectorContext;
51 type Output = Vec<Peer>;
52
53 async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Self::Output> {
54 let lease_kvs = lease::alive_datanodes(
56 &ctx.meta_peer_client,
57 Duration::from_secs(ctx.datanode_lease_secs),
58 )
59 .with_condition(lease::is_datanode_accept_ingest_workload)
60 .await?;
61
62 let mut weight_array = lease_kvs
64 .into_iter()
65 .map(|(k, v)| WeightedItem {
66 item: Peer {
67 id: k.node_id,
68 addr: v.node_addr.clone(),
69 },
70 weight: 1.0,
71 })
72 .collect();
73
74 let mut exclude_peer_ids = self
76 .node_excluder
77 .excluded_datanode_ids()
78 .iter()
79 .cloned()
80 .collect::<HashSet<_>>();
81 exclude_peer_ids.extend(opts.exclude_peer_ids.iter());
82 filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids);
83 let mut weighted_choose = RandomWeightedChoose::new(weight_array);
84 let selected = choose_items(&opts, &mut weighted_choose)?;
85
86 Ok(selected)
87 }
88}