meta_srv/selector/
lease_based.rs1use std::collections::HashSet;
16use std::sync::Arc;
17
18use common_meta::peer::Peer;
19
20use crate::error::Result;
21use crate::lease;
22use crate::metasrv::SelectorContext;
23use crate::node_excluder::NodeExcluderRef;
24use crate::selector::common::{choose_items, filter_out_excluded_peers};
25use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
26use crate::selector::{Selector, SelectorOptions};
27
28pub struct LeaseBasedSelector {
30 node_excluder: NodeExcluderRef,
31}
32
33impl LeaseBasedSelector {
34 pub fn new(node_excluder: NodeExcluderRef) -> Self {
35 Self { node_excluder }
36 }
37}
38
39impl Default for LeaseBasedSelector {
40 fn default() -> Self {
41 Self {
42 node_excluder: Arc::new(Vec::new()),
43 }
44 }
45}
46
47#[async_trait::async_trait]
48impl Selector for LeaseBasedSelector {
49 type Context = SelectorContext;
50 type Output = Vec<Peer>;
51
52 async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Self::Output> {
53 let lease_kvs = lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs)
55 .with_condition(lease::is_datanode_accept_ingest_workload)
56 .await?;
57
58 let mut weight_array = lease_kvs
60 .into_iter()
61 .map(|(k, v)| WeightedItem {
62 item: Peer {
63 id: k.node_id,
64 addr: v.node_addr.clone(),
65 },
66 weight: 1.0,
67 })
68 .collect();
69
70 let mut exclude_peer_ids = self
72 .node_excluder
73 .excluded_datanode_ids()
74 .iter()
75 .cloned()
76 .collect::<HashSet<_>>();
77 exclude_peer_ids.extend(opts.exclude_peer_ids.iter());
78 filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids);
79 let mut weighted_choose = RandomWeightedChoose::new(weight_array);
80 let selected = choose_items(&opts, &mut weighted_choose)?;
81
82 Ok(selected)
83 }
84}